Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiNodeChainList with self branching #102

Merged
merged 13 commits into from
Aug 10, 2017
12 changes: 10 additions & 2 deletions chainermn/functions/point_to_point_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,14 @@ def backward(self, inputs, grad_outputs):
xp = cuda.get_array_module(*inputs)
gw, = grad_outputs
self.comm.send(gw, self.peer_rank, self.peer_tag)
dummy_var = xp.array([[]], dtype=xp.float32)
return dummy_var

if inputs == ():
dummy_var = xp.array([], dtype=xp.float32)
else:
var, = inputs
dummy_var = xp.zeros(var.shape, dtype=xp.float32)

return dummy_var,


def send(x, communicator, rank, tag=0):
Expand All @@ -104,6 +110,7 @@ def send(x, communicator, rank, tag=0):

"""
chainer.utils.experimental('chainermn.functions.send')
assert rank != communicator.rank
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add an assertion comment like "Cannot send to the local process itself" or something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Or should it be an internal error and should not happen?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (use ValueError instead).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return Send(communicator, peer_rank=rank, peer_tag=tag)(x)


Expand Down Expand Up @@ -136,6 +143,7 @@ def recv(communicator, rank, delegate_variable=None, tag=0, device=-1):

"""
chainer.utils.experimental('chainermn.functions.recv')
assert rank != communicator.rank
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertion comment

if delegate_variable is None:
return Recv(
communicator,
Expand Down
58 changes: 36 additions & 22 deletions chainermn/link.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from six.moves import queue

import chainer
import chainermn
import chainermn.communicators
Expand Down Expand Up @@ -126,14 +128,10 @@ def add_link(self, link, rank_in=None, rank_out=None):
if isinstance(rank_out, int):
rank_out = [rank_out]

assert rank_in is None or self._comm.rank not in rank_in,\
"cannot specify self rank for rank_in"
assert rank_out is None or self._comm.rank not in rank_out,\
"cannot specify self rank for rank_out"

self._rank_inouts.append((rank_in, rank_out))

def __call__(self, *inputs):
comm_queue = queue.Queue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about checking comm_queue is empty at the end of this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I fixed it.

y = None
delegate_variable = None

Expand All @@ -157,23 +155,30 @@ def __call__(self, *inputs):
# Preprocess: receiving inputs from the other machines.
xs = []
for _rank_in in rank_in:
_x = chainermn.functions.recv(
self._comm,
rank=_rank_in,
delegate_variable=delegate_variable,
device=self._device_id)
if _rank_in == self._comm.rank:
# Receive inputs from itself.
if delegate_variable is None:
_x = comm_queue.get()
else:
_x = chainermn.functions.pseudo_connect(
delegate_variable,
comm_queue.get())
else:
_x = chainermn.functions.recv(
self._comm,
rank=_rank_in,
delegate_variable=delegate_variable,
device=self._device_id)

xs.append(_x)

# Guarantee the backward path to the previous graph
# component to be executed in the last to avoid dead-lock.
if delegate_variable is not None \
and _x.creator is not None:
_x.creator.rank = -1

xs.append(_x)
delegate_variable = _x

# Prevent "double-backwarding," i.e., backprop
# the same edge more than twice.
delegate_variable = None
# Prevent "double-backwarding," i.e., backprop
# the same edge more than twice.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than once ?
(only == 1 is good && >= 2 is not good, right?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. Don't we say "more than twice" in this case?

Copy link
Member

@keisukefukuda keisukefukuda Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of ratio, there's no big difference between "> 2x" and ">= 2x" , so both of them can be translated into a Japanese word "2倍以上". But in this case, I don't think it applies.
"... should be called exactly once" would be less confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for suggestion. Fixed it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

delegate_variable = None

# Actual forward.
x = f(*tuple(xs))
Expand All @@ -186,17 +191,26 @@ def __call__(self, *inputs):

else: # Send outputs to the other machines.
for i_comp, _rank_out in enumerate(rank_out):
if i_comp == 0:
if _rank_out == self._comm.rank:
# Send outputs to itself.
if delegate_variable is not None:
x = chainermn.functions.pseudo_connect(
delegate_variable,
x)
comm_queue.put(x)
delegate_variable = x
elif i_comp == 0:
delegate_variable = chainermn.functions.send(
x, self._comm,
rank=_rank_out)
else:
# If the model has multiple targets for send,
# we must guarantee backwards of each send to be
# called in the reversed order.
x = chainermn.functions.pseudo_connect(
delegate_variable,
x)
if delegate_variable is not None:
x = chainermn.functions.pseudo_connect(
delegate_variable,
x)
delegate_variable = chainermn.functions.send(
x, self._comm,
rank=_rank_out)
Expand Down
88 changes: 88 additions & 0 deletions tests/test_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,34 @@ def __init__(self, size, comm, rank_children):
self.add_link(BranchSubB(size), rank_in=rank_children, rank_out=None)


class BranchParent2(chainermn.MultiNodeChainList):
def __init__(self, size, comm, rank_children):
super(BranchParent2, self).__init__(comm=comm)
ranks = [comm.rank] + rank_children
self.add_link(BranchSubA(size), rank_in=None, rank_out=ranks)
self.add_link(BranchSubA(size), rank_in=comm.rank, rank_out=comm.rank)
self.add_link(BranchSubB(size), rank_in=ranks, rank_out=None)


class BranchParent3(chainermn.MultiNodeChainList):
def __init__(self, size, comm, rank_children):
super(BranchParent3, self).__init__(comm=comm)
ranks = rank_children + [comm.rank]
self.add_link(BranchSubA(size), rank_in=None, rank_out=ranks)
self.add_link(BranchSubA(size), rank_in=comm.rank, rank_out=comm.rank)
self.add_link(BranchSubB(size), rank_in=ranks, rank_out=None)


class BranchParent4(chainermn.MultiNodeChainList):
def __init__(self, size, comm, rank_children):
super(BranchParent4, self).__init__(comm=comm)
ranks = rank_children + [comm.rank]
ranks = ranks[1:] + ranks[0:1]
self.add_link(BranchSubA(size), rank_in=None, rank_out=ranks)
self.add_link(BranchSubA(size), rank_in=comm.rank, rank_out=comm.rank)
self.add_link(BranchSubB(size), rank_in=ranks, rank_out=None)


class BranchChild(chainermn.MultiNodeChainList):
def __init__(self, size, comm, rank_parent):
super(BranchChild, self).__init__(comm=comm)
Expand All @@ -101,6 +129,33 @@ def __init__(self, size, comm, rank_parent):
rank_out=rank_parent)


class TwistFirst(chainermn.MultiNodeChainList):
def __init__(self, size, comm, rank_next):
super(TwistFirst, self).__init__(comm=comm)
self.add_link(BranchSubA(size), rank_in=None, rank_out=rank_next)
self.add_link(BranchSubA(size), rank_in=rank_next, rank_out=None)


class Twist(chainermn.MultiNodeChainList):
def __init__(self, size, comm, rank_prev, rank_next):
super(Twist, self).__init__(comm=comm)
self.add_link(BranchSubA(size), rank_in=rank_prev, rank_out=comm.rank)
self.add_link(BranchSubA(size), rank_in=None, rank_out=rank_prev)
self.add_link(BranchSubA(size), rank_in=None, rank_out=rank_next)
self.add_link(BranchSubA(size), rank_in=rank_next, rank_out=comm.rank)
self.add_link(
BranchSubB(size),
rank_in=[comm.rank, comm.rank],
rank_out=None)


class TwistLast(chainermn.MultiNodeChainList):
def __init__(self, size, comm, rank_prev):
super(TwistLast, self).__init__(comm=comm)
self.add_link(BranchSubA(size), rank_in=rank_prev, rank_out=None)
self.add_link(BranchSubA(size), rank_in=None, rank_out=rank_prev)


@chainer.testing.parameterize(
{'gpu': True},
{'gpu': False},
Expand Down Expand Up @@ -198,3 +253,36 @@ def check_branching_model(self, parent_model):

def test_branching_model1(self):
self.check_branching_model(BranchParent1)

def test_branching_model2(self):
self.check_branching_model(BranchParent2)

def test_branching_model3(self):
self.check_branching_model(BranchParent3)

def test_branching_model4(self):
self.check_branching_model(BranchParent4)

def test_twisting_model(self):
n, d = 100, 10
X = np.random.randn(n, d).astype(np.float32)
Y = (np.random.rand(n) * 2).astype(np.int32)

if self.communicator.rank == 0:
model = L.Classifier(
TwistFirst(d, self.communicator, self.rank_next))
elif self.communicator.rank == self.communicator.size - 1:
model = L.Classifier(
TwistLast(d, self.communicator, self.rank_prev))
else:
model = L.Classifier(Twist(
d, self.communicator, self.rank_prev, self.rank_next))

if self.gpu:
model.to_gpu()
X = chainer.cuda.to_gpu(X)
Y = chainer.cuda.to_gpu(Y)

for i in range(n):
err = model(X[i:i + 1], Y[i:i + 1])
err.backward()