Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modify pipeline optimizer to only support the mode of sync pipeline training #25065

Merged
merged 18 commits into from
Jul 3, 2020
Merged
3 changes: 3 additions & 0 deletions paddle/fluid/API.spec
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
paddle.fluid.optimizer.PipelineOptimizer (paddle.fluid.optimizer.PipelineOptimizer, ('document', '2e55a29dbeb874934f7a1a1af3a22b8c'))
paddle.fluid.optimizer.PipelineOptimizer.__init__ (ArgSpec(args=['self', 'optimizer', 'num_microbatches', 'start_cpu_core_id'], varargs=None, keywords=None, defaults=(1, 0)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.PipelineOptimizer.minimize (ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
1,003 changes: 776 additions & 227 deletions python/paddle/fluid/optimizer.py

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ endif()

if(WIN32)
LIST(REMOVE_ITEM TEST_OPS test_boxps)
LIST(REMOVE_ITEM TEST_OPS test_paddlebox_datafeed)
LIST(REMOVE_ITEM TEST_OPS test_trainer_desc)
LIST(REMOVE_ITEM TEST_OPS test_multiprocess_reader_exception)
LIST(REMOVE_ITEM TEST_OPS test_avoid_twice_initialization)
Expand Down Expand Up @@ -87,7 +86,6 @@ endif()
if(NOT WITH_GPU OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_pipeline)
LIST(REMOVE_ITEM TEST_OPS test_boxps)
LIST(REMOVE_ITEM TEST_OPS test_paddlebox_datafeed)
endif()
list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290
list(REMOVE_ITEM TEST_OPS test_lstm_unit_op) # # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5185
Expand Down
112 changes: 0 additions & 112 deletions python/paddle/fluid/tests/unittests/test_boxps.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,117 +87,5 @@ def test_run_cmd(self):
self.assertTrue(ret2 == 0)


class TestBoxPSPreload(unittest.TestCase):
""" TestCases for BoxPS Preload """

def test_boxps_cpu(self):
self.run_boxps_preload(True, True)
self.run_boxps_preload(True, False)

def test_boxps_gpu(self):
self.run_boxps_preload(False, True)
self.run_boxps_preload(False, False)

def run_boxps_preload(self, is_cpu=True, random_with_lineid=False):
program = fluid.Program()
with fluid.program_guard(program):
x = fluid.layers.data(
name='x', shape=[1], dtype='int64', lod_level=0)
y = fluid.layers.data(
name='y', shape=[1], dtype='int64', lod_level=0)
z = layers.data(name='z', shape=[1], dtype='int64')
emb_x, emb_y = _pull_box_sparse([x, y], size=2)
emb_xp = _pull_box_sparse(x, size=2)
concat = layers.concat([emb_x, emb_y], axis=1)
fc = layers.fc(input=concat,
name="fc",
size=1,
num_flatten_dims=1,
bias_attr=False)
loss = layers.reduce_mean(fc)
place = fluid.CPUPlace(
) if is_cpu or not core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0)
exe = fluid.Executor(place)
batch_size = 100

def binary_print(slot, fout):
fout.write(str(len(slot)) + " ")
for e in slot:
fout.write(str(e) + " ")

batch1 = np.ones(
(batch_size, 2, 1)).astype("int64").reshape(batch_size, 2, 1)
filelist = []
place_str = "cpu" if is_cpu else "gpu"
for i in range(2):
filelist.append("test_hdfs_" + place_str + "_" + str(i))
for f in filelist:
with open(f, "w") as fout:
for ins in batch1:
for slot in ins:
binary_print(slot, fout)
fout.write("\n")

def create_dataset():
dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
dataset.set_date("20190930")
dataset.set_use_var([x, y])
dataset.set_batch_size(2)
dataset.set_thread(1)
dataset.set_filelist(filelist)
return dataset

datasets = []
datasets.append(create_dataset())
datasets.append(create_dataset())
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
optimizer = fluid.optimizer.PipelineOptimizer(
optimizer,
cut_list=[],
place_list=[place],
concurrency_list=[1],
queue_size=1,
sync_steps=-1)
optimizer.minimize(loss)

program._pipeline_opt["dump_fields"] = [
"fc.tmp_0", "fc.tmp_0@GRAD", "fake_var", "z",
"reduce_mean_3.tmp_0"
]
# fake_var: not in scope
# z: in scope, but no initialized
# reduce_mean_0.tmp_0, dimension is not right

program._pipeline_opt["dump_fields_path"] = "./dump_log/"
program._pipeline_opt["dump_param"] = ["fc.w_0"]
program._pipeline_opt["enable_random_dump"] = True
program._pipeline_opt["dump_interval"] = 10
program._pipeline_opt["random_with_lineid"] = random_with_lineid

exe.run(fluid.default_startup_program())
datasets[0].load_into_memory()
datasets[0].begin_pass()
datasets[0].slots_shuffle([])
datasets[1].preload_into_memory()
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=datasets[0],
print_period=1)
datasets[0].end_pass(True)
datasets[1].wait_preload_done()
datasets[1].begin_pass()
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=datasets[1],
print_period=1,
debug=True)
datasets[1].end_pass(False)
for f in filelist:
os.remove(f)
if os.path.isdir("dump_log"):
shutil.rmtree("dump_log")


if __name__ == '__main__':
unittest.main()
123 changes: 0 additions & 123 deletions python/paddle/fluid/tests/unittests/test_data_norm_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,129 +437,6 @@ def test_check_grad(self):
self.check_grad(['X'], 'Y', no_grad_set=set([]))


class TestDataNormOpWithSyncStats(unittest.TestCase):
"""
test class for data norm op
test forward and backward
"""

def test_sync_stats(self):
if not core.is_compiled_with_cuda():
return
if os.name == 'nt':
print(
'Skip TestDataNormOpWithSyncStats because nccl is not supported on windows'
)
return
x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
emb = layers.embedding(
input=x,
param_attr=fluid.ParamAttr(name="embx"),
size=[10, 2],
is_sparse=False)

dn = layers.data_norm(
input=emb,
name="hehe",
epsilon=1e-4,
param_attr={
"batch_size": 1e4,
"batch_sum": 1e5,
"batch_square": 1e4
},
summary_decay_rate=1,
sync_stats=True) #[-1,3]
loss = layers.mean(dn)

optimizer = fluid.optimizer.SGD(learning_rate=0.5)
optimizer = fluid.optimizer.PipelineOptimizer(
optimizer,
cut_list=[[emb], [loss]],
place_list=[
fluid.CUDAPlace(0), fluid.CUDAPlace(0), fluid.CPUPlace()
],
concurrency_list=[1, 1, 1],
queue_size=1,
sync_steps=10000000, )

all_p = fluid.default_main_program().global_block().all_parameters()
parameter_without_datanorm = []
for e in all_p:
if e.name.find("batch_size") != -1 or e.name.find(
"batch_sq") != -1 or e.name.find("batch_sum") != -1:
continue
parameter_without_datanorm.append(e.name)
optimizer.minimize(loss, parameter_list=parameter_without_datanorm)
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
#prepare data
batch_size = 1

def binary_print(slot, fout):
num = np.int16(len(slot) + 1)
num.tofile(fout)
a = np.int64(batch_size)
a.tofile(fout)
slot.tofile(fout)

#batch1 = np.array([[0,1], [1,2], [2,3]]).astype("int64").reshape(batch_size,2,1)
#batch2 = np.array([[1,2], [2,3], [3,4]]).astype("int64").reshape(batch_size,2,1)
batch1 = np.ones(
(batch_size, 1)).astype("int64").reshape(batch_size, 1, 1)
batch2 = np.ones(
(batch_size, 1)).astype("int64").reshape(batch_size, 1, 1)
data = [batch1, batch2]
data = [batch1]
filelist = []
for i in range(2):
filelist.append("test_pipeline_input_" + str(i))
for f in filelist:
with open(f, "wb") as fout:
for batch_data in data:
for ins in batch_data:
for slot in ins:
binary_print(slot, fout)

dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var([x])
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)

block = fluid.default_startup_program().global_block()
block.append_op(
type='c_comm_init_all', attrs={'ring_id': 0,
'devices': [0, 1]})
with open("main_program", "w") as fout:
fout.write(str(fluid.default_main_program()))
with open("startup_program", "w") as fout:
fout.write(str(fluid.default_startup_program()))
exe.run(fluid.default_startup_program())
emb_t = fluid.global_scope().find_var("embx").get_tensor()
para = np.ones((10, 2)).astype("float32")
emb_t.set(para, place)
for epoch in range(1):
exe.train_from_dataset(
fluid.default_main_program(),
dataset,
thread=2,
debug=False,
fetch_list=[],
fetch_info=[],
print_period=1)
batch_size = np.array(fluid.global_scope().find_var("hehe.batch_size")
.get_tensor())
self.assertEqual(batch_size[0], 10002)
b = np.array(fluid.global_scope().find_var("hehe.batch_sum").get_tensor(
))
self.assertEqual(b[0], 100002)
c = np.array(fluid.global_scope().find_var("hehe.batch_square_sum")
.get_tensor())
self.assertEqual(c[0], 10162)

for f in filelist:
os.remove(f)


class TestDataNormOpErrorr(unittest.TestCase):
def test_errors(self):
with program_guard(Program(), Program()):
Expand Down
Loading