Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make word2vec uses parallel.do when CI #7970

Merged
merged 4 commits into from
Jan 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions paddle/operators/sum_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,59 @@ class SumKernel : public framework::OpKernel<T> {
}
}
} else if (out_var->IsType<framework::SelectedRows>()) {
PADDLE_ENFORCE(!in_place, "SelectedRows not support inplace sum now");
std::unique_ptr<framework::SelectedRows> in0;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make SumOp support SelectedRows in place.

if (in_place) {
// If is in_place, we store the input[0] to in0
auto &in_sel0 = in_vars[0]->Get<SelectedRows>();
auto &rows = in_sel0.rows();
#ifdef PADDLE_WITH_CUDA
std::vector<int64_t> rows_in_cpu;
rows_in_cpu.reserve(rows.size());
for (auto item : rows) {
rows_in_cpu.push_back(item);
}
in0.reset(new framework::SelectedRows(rows_in_cpu, in_sel0.height()));
#else
in0.reset(new framework::SelectedRows(rows, in_sel0.height()));
#endif
in0->mutable_value()->ShareDataWith(in_sel0.value());
}

auto get_selected_row = [&](size_t i) -> const SelectedRows & {
if (i == 0 && in0) {
return *in0.get();
} else {
return in_vars[i]->Get<SelectedRows>();
}
};

auto *out = context.Output<SelectedRows>("Out");
out->mutable_rows()->clear();
auto *out_value = out->mutable_value();

// Runtime InferShape
size_t first_dim = 0;
for (int i = 0; i < N; i++) {
first_dim += in_vars[i]->Get<SelectedRows>().rows().size();
auto &sel_row = get_selected_row(i);
first_dim += sel_row.rows().size();
}
auto in_dim = in_vars[0]->Get<SelectedRows>().value().dims();
auto in_dim_vec = framework::vectorize(in_dim);
in_dim_vec[0] = static_cast<int64_t>(first_dim);
auto in_dim =
framework::vectorize(get_selected_row(N - 1).value().dims());
in_dim[0] = static_cast<int64_t>(first_dim);

out_value->Resize(framework::make_ddim(in_dim_vec));
out_value->Resize(framework::make_ddim(in_dim));
out_value->mutable_data<T>(context.GetPlace());

math::SelectedRowsAddTo<DeviceContext, T> functor;

int64_t offset = 0;
for (int i = 0; i < N; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If in_place=True, the first one should be skipped.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the value of offset is also related with the value of in_place

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the logic of of in-place support for SelectedRows.

PADDLE_ENFORCE_EQ(out->height(),
in_vars[i]->Get<SelectedRows>().height());
functor(context.template device_context<DeviceContext>(),
in_vars[i]->Get<SelectedRows>(), offset, out);
offset += in_vars[i]->Get<SelectedRows>().value().numel();
auto &sel_row = get_selected_row(i);

PADDLE_ENFORCE_EQ(out->height(), sel_row.height());
functor(context.template device_context<DeviceContext>(), sel_row,
offset, out);
offset += sel_row.value().numel();
}
} else if (out_var->IsType<framework::LoDTensorArray>()) {
auto &out_array = *out_var->GetMutable<framework::LoDTensorArray>();
Expand Down
209 changes: 139 additions & 70 deletions python/paddle/v2/fluid/tests/book/test_word2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,76 +12,145 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import unittest
import os

PASS_NUM = 100
EMBED_SIZE = 32
HIDDEN_SIZE = 256
N = 5
BATCH_SIZE = 32
IS_SPARSE = True

word_dict = paddle.dataset.imikolov.build_dict()
dict_size = len(word_dict)

first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64')
second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64')
third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64')
forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64')
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64')

embed_first = fluid.layers.embedding(
input=first_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_second = fluid.layers.embedding(
input=second_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_third = fluid.layers.embedding(
input=third_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_forth = fluid.layers.embedding(
input=forth_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')

concat_embed = fluid.layers.concat(
input=[embed_first, embed_second, embed_third, embed_forth], axis=1)
hidden1 = fluid.layers.fc(input=concat_embed, size=HIDDEN_SIZE, act='sigmoid')
predict_word = fluid.layers.fc(input=hidden1, size=dict_size, act='softmax')
cost = fluid.layers.cross_entropy(input=predict_word, label=next_word)
avg_cost = fluid.layers.mean(x=cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost)

train_reader = paddle.batch(
paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE)

place = fluid.CPUPlace()
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(
feed_list=[first_word, second_word, third_word, forth_word, next_word],
place=place)

exe.run(fluid.default_startup_program())

for pass_id in range(PASS_NUM):
for data in train_reader():
avg_cost_np = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost])
if avg_cost_np[0] < 5.0:
exit(0) # if avg cost less than 10.0, we think our code is good.
exit(1)

def main(use_cuda, is_sparse, parallel):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change previous unittest to function main

if use_cuda and not fluid.core.is_compiled_with_cuda():
return

PASS_NUM = 100
EMBED_SIZE = 32
HIDDEN_SIZE = 256
N = 5
BATCH_SIZE = 32
IS_SPARSE = is_sparse

def __network__(words):
embed_first = fluid.layers.embedding(
input=words[0],
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_second = fluid.layers.embedding(
input=words[1],
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_third = fluid.layers.embedding(
input=words[2],
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_forth = fluid.layers.embedding(
input=words[3],
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')

concat_embed = fluid.layers.concat(
input=[embed_first, embed_second, embed_third, embed_forth], axis=1)
hidden1 = fluid.layers.fc(input=concat_embed,
size=HIDDEN_SIZE,
act='sigmoid')
predict_word = fluid.layers.fc(input=hidden1,
size=dict_size,
act='softmax')
cost = fluid.layers.cross_entropy(input=predict_word, label=words[4])
avg_cost = fluid.layers.mean(x=cost)
return avg_cost

word_dict = paddle.dataset.imikolov.build_dict()
dict_size = len(word_dict)

first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64')
second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64')
third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64')
forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64')
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64')

if not parallel:
avg_cost = __network__(
[first_word, second_word, third_word, forth_word, next_word])
else:
places = fluid.layers.get_places()
pd = fluid.layers.ParallelDo(places)
with pd.do():
avg_cost = __network__(
map(pd.read_input, [
first_word, second_word, third_word, forth_word, next_word
]))
pd.write_output(avg_cost)

avg_cost = fluid.layers.mean(x=pd())

sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost)

train_reader = paddle.batch(
paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE)

place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(
feed_list=[first_word, second_word, third_word, forth_word, next_word],
place=place)

exe.run(fluid.default_startup_program())

for pass_id in range(PASS_NUM):
for data in train_reader():
avg_cost_np = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost])
if avg_cost_np[0] < 5.0:
return
raise AssertionError("Cost is too large {0:2.2}".format(avg_cost_np[0]))


FULL_TEST = os.getenv('FULL_TEST',
'0').lower() in ['true', '1', 't', 'y', 'yes', 'on']
SKIP_REASON = "Only run minimum number of tests in CI server, to make CI faster"


class W2VTest(unittest.TestCase):
pass


def inject_test_method(use_cuda, is_sparse, parallel):
fn_name = "test_{0}_{1}_{2}".format("cuda" if use_cuda else "cpu", "sparse"
if is_sparse else "dense", "parallel"
if parallel else "normal")

def __impl__(*args, **kwargs):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
main(use_cuda=use_cuda, is_sparse=is_sparse, parallel=parallel)

if use_cuda and is_sparse and parallel:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only test use_cuda=True, is_sparse=True, parallel=True in our CI sever.

fn = __impl__
else:
# skip the other test when on CI server
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip tests if the environment varaible FULL_TEST is not True. It will make our CI faster.

fn = unittest.skipUnless(
condition=FULL_TEST, reason=SKIP_REASON)(__impl__)

setattr(W2VTest, fn_name, fn)


for use_cuda in (False, True):
for is_sparse in (False, True):
for parallel in (False, True):
inject_test_method(use_cuda, is_sparse, parallel)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inject_test_method to W2VTest


if __name__ == '__main__':
unittest.main()