Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel executor tests #9950

Merged
merged 9 commits into from
Apr 18, 2018
20 changes: 11 additions & 9 deletions python/paddle/fluid/parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def __init__(self,
main_program=test_program,
share_vars_from=train_exe)

train_loss, = train_exe.run([loss.name], feed_dict=feed_dict)
test_loss, = test_exe.run([loss.name], feed_dict=feed_dict)
train_loss, = train_exe.run([loss.name], feed=feed_dict)
test_loss, = test_exe.run([loss.name], feed=feed_dict)
"""

self._places = []
Expand Down Expand Up @@ -123,22 +123,24 @@ def __init__(self,
allow_op_delay)
self.scope = scope

def run(self, fetch_list, feed_dict={}):
def run(self, fetch_list, feed={}, feed_dict={}):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should have a warning to notify users that feed_dict is deprecated.

"""
:param fetch_list: A list of variable names that will be fetched.
:param feed_dict: A dict mapping for feed variable name to LoDTensor
:param feed: A dict mapping for feed variable name to LoDTensor
or numpy array.
:return: fetched value list.
"""
if not isinstance(feed_dict, dict):
raise TypeError("feed_dict should be a dict")
if feed == {}:
feed = feed_dict
if not isinstance(feed, dict):
raise TypeError("feed should be a dict")

feed_tensor_dict = {}
for i, feed_name in enumerate(feed_dict):
feed_tensor = feed_dict[feed_name]
for i, feed_name in enumerate(feed):
feed_tensor = feed[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
feed_tensor.set(feed_dict[feed_name], self._act_places[0])
feed_tensor.set(feed[feed_name], self._act_places[0])
feed_tensor_dict[feed_name] = feed_tensor

fetch_var_name = '@FETCHED_VAR_NAME@'
Expand Down
64 changes: 54 additions & 10 deletions python/paddle/fluid/tests/unittests/test_parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,30 @@ class TestParallelExecutorBase(unittest.TestCase):
def check_network_convergence(self,
method,
memory_opt=True,
iter=10,
iter=50,
batch_size=None,
allow_op_delay=False,
feed_dict={}):
feed_dict={},
seed=None,
use_parallel_executor=True):
def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed)
elif isinstance(exe, fluid.Executor):
if program is None:
program = fluid.default_main_program()
res = exe.run(program=program, feed=feed, fetch_list=fetch_list)
else:
raise ValueError('Unkown type exe')
return res

main = fluid.Program()
startup = fluid.Program()
startup.random_seed = 1 # Fix random seed
with fluid.program_guard(main, startup):
if seed is not None:
startup.random_seed = seed
main.random_seed = seed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main.random_seed = seed is unnecessary.

loss = method(use_feed=len(feed_dict) > 0)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
Expand All @@ -217,18 +233,24 @@ def check_network_convergence(self,
startup_exe = fluid.Executor(place)
startup_exe.run(startup)

exe = fluid.ParallelExecutor(
True, loss_name=loss.name, allow_op_delay=allow_op_delay)
if use_parallel_executor:
exe = fluid.ParallelExecutor(
True, loss_name=loss.name, allow_op_delay=allow_op_delay)
else:
exe = fluid.Executor(place=place)

if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count()
begin = time.time()
first_loss, = exe.run([loss.name], feed_dict=feed_dict)
first_loss, = run_executor(
exe=exe, feed=feed_dict, fetch_list=[loss.name])
first_loss = numpy.array(first_loss)

for i in xrange(iter):
exe.run([], feed_dict=feed_dict)
run_executor(exe=exe, feed=feed_dict, fetch_list=[])

last_loss, = exe.run([loss.name], feed_dict=feed_dict)
last_loss, = run_executor(
exe=exe, feed=feed_dict, fetch_list=[loss.name])
end = time.time()

if batch_size is not None:
Expand All @@ -239,6 +261,7 @@ def check_network_convergence(self,

print first_loss, last_loss
# self.assertGreater(first_loss[0], last_loss[0])
return first_loss, last_loss


class TestMNIST(TestParallelExecutorBase):
Expand Down Expand Up @@ -268,6 +291,27 @@ def test_simple_fc(self):
simple_fc_net, feed_dict={"image": img,
"label": label})

def test_simple_fc_parallel_accuracy(self):
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
single_first_loss, single_last_loss = self.check_network_convergence(
method=simple_fc_net,
seed=1000,
feed_dict={"image": img,
"label": label},
use_parallel_executor=False)
parallel_first_loss, parallel_last_loss = self.check_network_convergence(
method=simple_fc_net,
seed=1000,
feed_dict={"image": img,
"label": label},
use_parallel_executor=True)

for p_f in parallel_first_loss:
self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6)
for p_l in parallel_last_loss:
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)

def test_batchnorm_fc(self):
self.check_network_convergence(fc_with_batchnorm)
img = numpy.zeros(shape=[32, 784], dtype='float32')
Expand Down Expand Up @@ -496,10 +540,10 @@ def test_parallel_testing(self):
share_vars_from=train_exe)

for i in xrange(5):
test_loss, = test_exe.run([loss.name], feed_dict=feed_dict)
test_loss, = test_exe.run([loss.name], feed=feed_dict)
test_loss = numpy.array(test_loss)

train_loss, = train_exe.run([loss.name], feed_dict=feed_dict)
train_loss, = train_exe.run([loss.name], feed=feed_dict)
train_loss = numpy.array(train_loss)
self.assertTrue(
numpy.allclose(
Expand Down Expand Up @@ -649,5 +693,5 @@ def test_all(self):
for i in xrange(10):
cur_batch = next(data)
print map(numpy.array,
pe.run(feed_dict=feeder.feed(cur_batch),
pe.run(feed=feeder.feed(cur_batch),
fetch_list=[avg_cost.name]))[0]