Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel executor tests #9950

Merged
merged 9 commits into from
Apr 18, 2018
11 changes: 6 additions & 5 deletions python/paddle/fluid/parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import multiprocessing
import framework
import executor
import warnings
import sys

__all__ = ['ParallelExecutor']
Expand Down Expand Up @@ -62,8 +63,8 @@ def __init__(self,
main_program=test_program,
share_vars_from=train_exe)

train_loss, = train_exe.run([loss.name], feed_dict=feed_dict)
test_loss, = test_exe.run([loss.name], feed_dict=feed_dict)
train_loss, = train_exe.run([loss.name], feed=feed_dict)
test_loss, = test_exe.run([loss.name], feed=feed_dict)
"""

self._places = []
Expand Down Expand Up @@ -103,8 +104,8 @@ def __init__(self,

self.persistable_vars = [
v.name
for v in filter(lambda var: \
var.persistable and var.type != core.VarDesc.VarType.RAW,
for v in filter(
lambda var: var.persistable and var.type != core.VarDesc.VarType.RAW,
main.list_vars())
]

Expand Down Expand Up @@ -163,7 +164,7 @@ def run(self, fetch_list, feed=None, feed_dict=None):
Returns: fetched result list.

"""
if feed is None:
if feed is None and feed_dict is not None:
feed = feed_dict
print >> sys.stderr, "`feed_dict` is deprecated. Please use `feed=`"

Expand Down
61 changes: 52 additions & 9 deletions python/paddle/fluid/tests/unittests/test_parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,29 @@ class TestParallelExecutorBase(unittest.TestCase):
def check_network_convergence(self,
method,
memory_opt=True,
iter=10,
iter=50,
batch_size=None,
allow_op_delay=False,
feed_dict=None):
feed_dict=None,
seed=None,
use_parallel_executor=True):
def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed)
elif isinstance(exe, fluid.Executor):
if program is None:
program = fluid.default_main_program()
res = exe.run(program=program, feed=feed, fetch_list=fetch_list)
else:
raise ValueError('Unkown type exe')
return res

main = fluid.Program()
startup = fluid.Program()
startup.random_seed = 1 # Fix random seed
with fluid.program_guard(main, startup):
if seed is not None:
startup.random_seed = seed
loss = method(use_feed=feed_dict is not None)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
Expand All @@ -217,18 +232,24 @@ def check_network_convergence(self,
startup_exe = fluid.Executor(place)
startup_exe.run(startup)

exe = fluid.ParallelExecutor(
True, loss_name=loss.name, allow_op_delay=allow_op_delay)
if use_parallel_executor:
exe = fluid.ParallelExecutor(
True, loss_name=loss.name, allow_op_delay=allow_op_delay)
else:
exe = fluid.Executor(place=place)

if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count()
begin = time.time()
first_loss, = exe.run([loss.name], feed=feed_dict)
first_loss, = run_executor(
exe=exe, feed=feed_dict, fetch_list=[loss.name])
first_loss = numpy.array(first_loss)

for i in xrange(iter):
exe.run([], feed=feed_dict)
run_executor(exe=exe, feed=feed_dict, fetch_list=[])

last_loss, = exe.run([loss.name], feed=feed_dict)
last_loss, = run_executor(
exe=exe, feed=feed_dict, fetch_list=[loss.name])
end = time.time()

if batch_size is not None:
Expand All @@ -239,6 +260,7 @@ def check_network_convergence(self,

print first_loss, last_loss
# self.assertGreater(first_loss[0], last_loss[0])
return first_loss, last_loss


class TestMNIST(TestParallelExecutorBase):
Expand Down Expand Up @@ -268,6 +290,27 @@ def test_simple_fc(self):
simple_fc_net, feed_dict={"image": img,
"label": label})

def test_simple_fc_parallel_accuracy(self):
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
single_first_loss, single_last_loss = self.check_network_convergence(
method=simple_fc_net,
seed=1000,
feed_dict={"image": img,
"label": label},
use_parallel_executor=False)
parallel_first_loss, parallel_last_loss = self.check_network_convergence(
method=simple_fc_net,
seed=1000,
feed_dict={"image": img,
"label": label},
use_parallel_executor=True)

for p_f in parallel_first_loss:
self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6)
for p_l in parallel_last_loss:
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)

def test_batchnorm_fc(self):
self.check_network_convergence(fc_with_batchnorm)
img = numpy.zeros(shape=[32, 784], dtype='float32')
Expand Down Expand Up @@ -496,10 +539,10 @@ def test_parallel_testing(self):
share_vars_from=train_exe)

for i in xrange(5):
test_loss, = test_exe.run([loss.name], feed_dict=feed_dict)
test_loss, = test_exe.run([loss.name], feed=feed_dict)
test_loss = numpy.array(test_loss)

train_loss, = train_exe.run([loss.name], feed_dict=feed_dict)
train_loss, = train_exe.run([loss.name], feed=feed_dict)
train_loss = numpy.array(train_loss)
self.assertTrue(
numpy.allclose(
Expand Down