From 77e05062dcab2ad6ac4178ca8b5f878b6f8bda92 Mon Sep 17 00:00:00 2001 From: levelfour Date: Thu, 1 Mar 2018 14:58:01 +0900 Subject: [PATCH 1/4] Add a dummy variable to inputs of send, to avoid deadlock. --- .../functions/point_to_point_communication.py | 25 +++-- .../test_point_to_point_communication.py | 92 +++++++++++++++---- 2 files changed, 91 insertions(+), 26 deletions(-) diff --git a/chainermn/functions/point_to_point_communication.py b/chainermn/functions/point_to_point_communication.py index 24c1a3f3..fb38f80e 100644 --- a/chainermn/functions/point_to_point_communication.py +++ b/chainermn/functions/point_to_point_communication.py @@ -1,4 +1,5 @@ import collections +import numpy import chainer from chainer import cuda @@ -23,22 +24,27 @@ def label(self): def forward(self, inputs): xp = cuda.get_array_module(*inputs) - if len(inputs) == 1: - inputs = inputs[0] + # The last input is dummy variable, to retain gradient computation + # of this function. + xs = inputs[:-1] - self.comm.send(inputs, self.peer_rank, self.peer_tag) + if len(xs) == 1: + xs = xs[0] + + self.comm.send(xs, self.peer_rank, self.peer_tag) # Return an empty variable, which serves as "delegate_variable." return xp.array([], dtype=xp.float32), def backward(self, inputs, grad_outputs): xp = cuda.get_array_module(*inputs) + dummy_grad = xp.array([], dtype=xp.float32) with cuda.get_device_from_array(*inputs): grad = self.comm.recv(self.peer_rank, self.peer_tag) if isinstance(grad, tuple): - return tuple([xp.array(gy) for gy in grad]) + return tuple([xp.array(gy) for gy in grad] + [dummy_grad]) else: - return xp.array(grad), + return xp.array(grad), dummy_grad class Recv(chainer.Function): @@ -135,12 +141,17 @@ def send(x, communicator, rank, tag=0): 'rank must be different from communicator rank, ' 'otherwise deadlock occurs') + # Dummy variable to retain gradient computation of send, + # otherwise the corresponding recv will cause deadlock in backward + # in the case where all inputs for this function does not require_grad. + dummy_var = chainer.Variable(numpy.array([], dtype=numpy.float32)) + if isinstance(x, collections.Iterable): delegate_variable = Send( - communicator, peer_rank=rank, peer_tag=tag)(*x) + communicator, peer_rank=rank, peer_tag=tag)(*x, dummy_var) else: delegate_variable = Send( - communicator, peer_rank=rank, peer_tag=tag)(x) + communicator, peer_rank=rank, peer_tag=tag)(x, dummy_var) delegate_variable.name = 'delegate_variable' return delegate_variable diff --git a/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py b/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py index 15b57474..83fa32cd 100644 --- a/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py +++ b/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py @@ -1,8 +1,10 @@ import copy import functools +import unittest import chainer import chainer.testing +import chainer.testing.attr import numpy import pytest @@ -10,9 +12,9 @@ import chainermn.functions -class PointToPointCommunication(object): +class TestPointToPointCommunication(unittest.TestCase): - def __init__(self, gpu): + def setup(self, gpu): self.gpu = gpu if self.gpu: self.communicator = chainermn.create_communicator('hierarchical') @@ -55,7 +57,7 @@ def _init_w(self, l): return 1.0 * numpy.arange(100).reshape(10, 10).astype(numpy.float32) \ / ((l + 1) * 100) - def test_communication(self): + def check_communication(self): if self.communicator.rank == 0: # Input process. y = self.f(self.model(self.x)) @@ -99,7 +101,16 @@ def test_communication(self): y, self.communicator, self.rank_send) err.backward() - def test_retain(self): + def test_communication_cpu(self): + self.setup(False) + self.check_communication() + + @chainer.testing.attr.gpu + def test_communication_gpu(self): + self.setup(True) + self.check_communication() + + def check_retain(self): if self.communicator.rank == 0: # Starting process. t = copy.copy(self.x) @@ -127,6 +138,15 @@ def test_retain(self): y, self.communicator, self.rank_send) err.backward() + def test_retain_cpu(self): + self.setup(False) + self.check_retain() + + @chainer.testing.attr.gpu + def test_retain_gpu(self): + self.setup(True) + self.check_retain() + def check_tuple_communication(self, length): if self.communicator.rank == 0: y = [] @@ -153,25 +173,59 @@ def check_tuple_communication(self, length): y, self.communicator, self.rank_send) err.backward() - def test_tuple_communication1(self): + def test_tuple_communication1_cpu(self): + self.setup(False) + self.check_tuple_communication(1) + + def test_tuple_communication2_cpu(self): + self.setup(False) + self.check_tuple_communication(2) + + @chainer.testing.attr.gpu + def test_tuple_communication1_gpu(self): + self.setup(True) self.check_tuple_communication(1) - def test_tuple_communication2(self): + @chainer.testing.attr.gpu + def test_tuple_communication2_gpu(self): + self.setup(True) self.check_tuple_communication(2) -def test_cpu(): - p2pcom = PointToPointCommunication(False) - p2pcom.test_communication() - p2pcom.test_retain() - p2pcom.test_tuple_communication1() - p2pcom.test_tuple_communication2() +class TestNonVariableInput(unittest.TestCase): + def setUp(self): + self.communicator = chainermn.create_communicator('naive') + self.rank_send = (self.communicator.rank + 1) % self.communicator.size + self.rank_recv = (self.communicator.rank - 1) % self.communicator.size + + def test_non_variable_send(self): + """Checks if backward will be called even if inputs are not Variable. + + This test confirms whether deadlock occurs when numpy/cupy array is + given as an input of send. + In this case, the input will be converted to chainer Variable without + ``requires_grad``, thus ``backward`` will not be called without any + modification. + """ + if self.communicator.rank == 0: +# x = chainer.Variable(numpy.ones((1, 10)).astype(numpy.float32)) + x = numpy.ones((1, 10)).astype(numpy.float32) + phi = chainermn.functions.send(x, self.communicator, rank=self.rank_send) + x = chainermn.functions.pseudo_connect(phi, x) + y = chainer.functions.sum(x) + t = numpy.array(0).astype(numpy.float32) + z = chainer.functions.mean_squared_error(y, t) + z.backward() + + elif self.communicator.rank == self.communicator.size - 1: + x = chainermn.functions.recv(self.communicator, rank=self.rank_recv) + y = chainer.functions.sum(x) + t = numpy.array(0).astype(numpy.float32) + z = chainer.functions.mean_squared_error(y, t) + z.backward() -@chainer.testing.attr.gpu -def test_gpu(): - p2pcom = PointToPointCommunication(True) - p2pcom.test_communication() - p2pcom.test_retain() - p2pcom.test_tuple_communication1() - p2pcom.test_tuple_communication2() + else: + x = chainermn.functions.recv(self.communicator, rank=self.rank_recv) + phi = chainermn.functions.send(x, self.communicator, rank=self.rank_next) + phi.backward() From e55f03bf2babb3ff47d139523c4da5c4acd70440 Mon Sep 17 00:00:00 2001 From: Fukumu Tsutsumi Date: Thu, 1 Mar 2018 15:35:17 +0900 Subject: [PATCH 2/4] cupy --- chainermn/functions/point_to_point_communication.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chainermn/functions/point_to_point_communication.py b/chainermn/functions/point_to_point_communication.py index fb38f80e..ea6ded90 100644 --- a/chainermn/functions/point_to_point_communication.py +++ b/chainermn/functions/point_to_point_communication.py @@ -141,10 +141,12 @@ def send(x, communicator, rank, tag=0): 'rank must be different from communicator rank, ' 'otherwise deadlock occurs') + xp = cuda.get_array_module(*x) + # Dummy variable to retain gradient computation of send, # otherwise the corresponding recv will cause deadlock in backward # in the case where all inputs for this function does not require_grad. - dummy_var = chainer.Variable(numpy.array([], dtype=numpy.float32)) + dummy_var = chainer.Variable(xp.array([], dtype=xp.float32)) if isinstance(x, collections.Iterable): delegate_variable = Send( From c5303d9d80829276098ec80aa60f7d57bd8bf471 Mon Sep 17 00:00:00 2001 From: levelfour Date: Mon, 5 Mar 2018 11:17:57 +0900 Subject: [PATCH 3/4] fix for PEP8 --- chainermn/functions/point_to_point_communication.py | 6 +++--- .../test_point_to_point_communication.py | 13 ++++++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/chainermn/functions/point_to_point_communication.py b/chainermn/functions/point_to_point_communication.py index ea6ded90..5c29a340 100644 --- a/chainermn/functions/point_to_point_communication.py +++ b/chainermn/functions/point_to_point_communication.py @@ -1,5 +1,4 @@ import collections -import numpy import chainer from chainer import cuda @@ -148,9 +147,10 @@ def send(x, communicator, rank, tag=0): # in the case where all inputs for this function does not require_grad. dummy_var = chainer.Variable(xp.array([], dtype=xp.float32)) - if isinstance(x, collections.Iterable): + if isinstance(x, list) or isinstance(x, tuple): + inputs = x + type(x)([dummy_var]) delegate_variable = Send( - communicator, peer_rank=rank, peer_tag=tag)(*x, dummy_var) + communicator, peer_rank=rank, peer_tag=tag)(*inputs) else: delegate_variable = Send( communicator, peer_rank=rank, peer_tag=tag)(x, dummy_var) diff --git a/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py b/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py index 83fa32cd..aa82fff5 100644 --- a/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py +++ b/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py @@ -209,9 +209,9 @@ def test_non_variable_send(self): modification. """ if self.communicator.rank == 0: -# x = chainer.Variable(numpy.ones((1, 10)).astype(numpy.float32)) x = numpy.ones((1, 10)).astype(numpy.float32) - phi = chainermn.functions.send(x, self.communicator, rank=self.rank_send) + phi = chainermn.functions.send( + x, self.communicator, rank=self.rank_send) x = chainermn.functions.pseudo_connect(phi, x) y = chainer.functions.sum(x) t = numpy.array(0).astype(numpy.float32) @@ -219,13 +219,16 @@ def test_non_variable_send(self): z.backward() elif self.communicator.rank == self.communicator.size - 1: - x = chainermn.functions.recv(self.communicator, rank=self.rank_recv) + x = chainermn.functions.recv( + self.communicator, rank=self.rank_recv) y = chainer.functions.sum(x) t = numpy.array(0).astype(numpy.float32) z = chainer.functions.mean_squared_error(y, t) z.backward() else: - x = chainermn.functions.recv(self.communicator, rank=self.rank_recv) - phi = chainermn.functions.send(x, self.communicator, rank=self.rank_next) + x = chainermn.functions.recv( + self.communicator, rank=self.rank_recv) + phi = chainermn.functions.send( + x, self.communicator, rank=self.rank_next) phi.backward() From a04d341c8566922af8fa94e51795bd7334dae142 Mon Sep 17 00:00:00 2001 From: levelfour Date: Mon, 5 Mar 2018 12:53:34 +0900 Subject: [PATCH 4/4] skip test for single process --- chainermn/functions/point_to_point_communication.py | 2 -- .../functions_tests/test_point_to_point_communication.py | 4 ++++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/chainermn/functions/point_to_point_communication.py b/chainermn/functions/point_to_point_communication.py index 5c29a340..854fc868 100644 --- a/chainermn/functions/point_to_point_communication.py +++ b/chainermn/functions/point_to_point_communication.py @@ -1,5 +1,3 @@ -import collections - import chainer from chainer import cuda import chainer.utils diff --git a/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py b/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py index aa82fff5..b31f2210 100644 --- a/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py +++ b/tests/chainermn_tests/functions_tests/test_point_to_point_communication.py @@ -196,6 +196,10 @@ class TestNonVariableInput(unittest.TestCase): def setUp(self): self.communicator = chainermn.create_communicator('naive') + + if self.communicator.size < 2: + pytest.skip("This test is for multinode") + self.rank_send = (self.communicator.rank + 1) % self.communicator.size self.rank_recv = (self.communicator.rank - 1) % self.communicator.size