Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

rsp push and rsp pull for comm device, used in kvstore('device') #8732

Merged
merged 35 commits into from
Jan 15, 2018
Merged

rsp push and rsp pull for comm device, used in kvstore('device') #8732

merged 35 commits into from
Jan 15, 2018

Conversation

ZiyueHuang
Copy link
Member

Description

Although the added test can pass on my machine(centos7 and 8G 1080 GPU), the test in master is skipped now waiting to be fixed.

cc @eric-haibin-lin

Checklist

Essentials

  • Passed code style checking (make lint)
  • Changes are complete (i.e. I finished coding on this PR)
  • All changes have test coverage
  • For user-facing API changes, API doc string has been updated. For new C++ functions in header files, their functionalities and arguments are well-documented.
  • To my best knowledge, examples are either not affected by this change, or have been fixed to be compatible with this change

Changes

  • rsp push and rsp pull for comm device

Comments

  • If this change is a backward incompatible change, why must this change be made.
  • Interesting edge cases to note here

@eric-haibin-lin eric-haibin-lin self-assigned this Nov 21, 2017
@ZiyueHuang ZiyueHuang changed the title [WIP] rsp push and rsp pull for comm device rsp push and rsp pull for comm device Nov 27, 2017
@ZiyueHuang ZiyueHuang changed the title rsp push and rsp pull for comm device rsp push and rsp pull for comm device, used in kvstore('device') Nov 27, 2017
Copy link
Member

@eric-haibin-lin eric-haibin-lin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @reminisce @rahul003 for further reviews

int type = std::get<2>(sorted_key_attrs_[i]);
NDArrayStorageType stype = std::get<3>(sorted_key_attrs_[i]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const?

const_vars[i] = reduce[i].var();
}
auto result = buf.merged;
Engine::Get()->PushAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to ndarray.cc instead?

Copy link
Member Author

@ZiyueHuang ZiyueHuang Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this should move into ndarray.cc? I think it is fine here, i.e. push the operation into engine in comm.h.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend the ElementwiseSum function in https://github.com/apache/incubator-mxnet/blob/master/src/ndarray/ndarray.cc#L574 to handle row-sparse cases?

case gpu::kDevMask: {
mxnet::ndarray::ElementwiseSum(rctx.get_stream<gpu>(), rsc, reduce, &out);
break;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ctx.get_stream<gpu>()->Wait(); missing if use CUDA?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MXNET_USE_CUDA is already in line 575

mxnet::common::SparseRetainOpForwardRspWrapper<gpu>(rctx.get_stream<gpu>(),
src_gpu, indices, kWriteTo, &temp);
break;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is Stream->Wait() missing?

# single
kv.init('a', mx.nd.zeros(shape, stype=stype))
# list
kv.init(str_keys, [mx.nd.zeros(shape=shape, stype=stype)] * len(keys))
return kv


@unittest.skip("Test fails intermittently. Temporarily disabled until fixed. Tracked at https://github.com/apache/incubator-mxnet/issues/8262")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test should be fixed in #8838

const_vars[i] = reduce[i].var();
}
auto result = buf.merged;
Engine::Get()->PushAsync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend the ElementwiseSum function in https://github.com/apache/incubator-mxnet/blob/master/src/ndarray/ndarray.cc#L574 to handle row-sparse cases?

# single
kv.init('a', mx.nd.zeros(shape, stype=stype))
# list
kv.init(str_keys, [mx.nd.zeros(shape=shape, stype=stype)] * len(keys))
return kv


def test_row_sparse_pull():
kv = init_kv_with_str('row_sparse')
def test_row_sparse_pull(kv_type='local'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think nosetest will not run the code in __main__ but rather look for functions starting with test_. Shall we test both local and device in test_row_sparse_pull?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, we should have a separate test for kv=device with rsp values on different contexts. Using gpu ctxes is fine, CI should have more than 1 GPUs.

} else {
CHECK_EQ(out->storage_type(), kRowSparseStorage)
<< "BroadcastRowSparse expects row_sparse dst NDArray";
const bool is_diff_ctx = out->ctx() != src.ctx();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming src is always on GPU?
If so, should we perform retain first before copying it to other devices?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src is not assumed to be on gpu. Actually src is always on cpu. As you can see in https://github.com/apache/incubator-mxnet/blob/master/src/kvstore/kvstore_local.h#L233, src is local_[key]. And local_[key] is initialized to be on pinned_ctx_ which is always cpu, https://github.com/apache/incubator-mxnet/blob/master/src/kvstore/kvstore_local.h#L152.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true at the beginning. But as soon as you push some gradients on GPU, it copies the weight from pinned_ctx to GPU. See
https://github.com/apache/incubator-mxnet/blob/master/src/kvstore/kvstore_local.h#L173

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nonetheless, I think performing sparse retain before the copy makes more sense since the source array is usually very large.

@ZiyueHuang
Copy link
Member Author

ZiyueHuang commented Nov 28, 2017

@eric-haibin-lin Yes, I think ElementwiseSum in ndarray.cc can be extened to handle rsp ndarrays, but this line, Resource rsc = ResourceManager::Get()->Request(rctx.ctx, ResourceRequest(ResourceRequest::kTempSpace));, should be inserted into ElementwiseSum since temp space is needed for rsp cases. Is this ok?

@eric-haibin-lin
Copy link
Member

Can resource_request also be added to ndarray.cc? Others may use ElementwiseSum as a black box. Also see https://github.com/apache/incubator-mxnet/blob/master/src/ndarray/ndarray.cc#L667 which request temp resource

@ZiyueHuang
Copy link
Member Author

Got it. I think it is OK. Thanks for your reference !

@@ -215,6 +215,13 @@ void CheckFormatImpl(const RunContext &rctx, const NDArray &input,
}


template<typename xpu>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure functions in .h are documented. Should add some description for CastStorageDispatch too...

CHECK_EQ(src.storage_type(), kRowSparseStorage)
<< "BroadcastRowSparse expects row-sparse src NDArray";

bool is_same_rowid = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some brief description explaining the optimization

} else {
CHECK_EQ(out->storage_type(), kRowSparseStorage)
<< "BroadcastRowSparse expects row_sparse dst NDArray";
const bool is_diff_ctx = out->ctx() != src.ctx();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nonetheless, I think performing sparse retain before the copy makes more sense since the source array is usually very large.

if (is_diff_ctx) {
CopyFromTo(src, &src_gpu, priority);
}
NDArray row_id_gpu = NDArray(row_id.shape(), out->ctx(), false, mshadow::kInt64);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it still work if the user provide outputs on the cpu device?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. More unittests are added.

check_rsp_pull(kv, 1, [mx.gpu(0)])
check_rsp_pull(kv, 4, [mx.gpu(i//2) for i in range(4)])
check_rsp_push_pull('local')
check_rsp_push_pull('device')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have test cases that at least covers the following cases for kvstore=device:
push cpu then rsp_pull cpu
push gpu then rsp_pull gpu

})
}

void CopyRetainedRows(RunContext rctx,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add brief comment

check_rsp_pull(kv, 4, [mx.gpu(i//2) for i in range(4)])
check_rsp_pull(kv, 4, [mx.cpu(i) for i in range(4)])

check_rsp_push_pull('local')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the test case where the same row_id is used for rsp_pull?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

CHECK_EQ(src.storage_type(), kRowSparseStorage)
<< "BroadcastRowSparse expects row-sparse src NDArray";

// whether the indices are the same
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is code is duplicated in comm.h and kvstore_local.h. Shall we move it to util.h?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved

@ZiyueHuang
Copy link
Member Author

ZiyueHuang commented Dec 17, 2017

GPU unique. Same row id for every gpu.

batch size Device samples/sec
16384 1 gpu 2.8 M
16384 * 2 2 gpu 3.3 M
16384 * 4 4 gpu 4.3 M
16384 * 8 8 gpu 5.6 M

@ZiyueHuang
Copy link
Member Author

ZiyueHuang commented Dec 17, 2017

profile.zip

Replace mx.nd.waitall with wait_to_read()

batch size Device samples/sec
16384 1 gpu 3 M
16384 * 2 2 gpu 3.7 M
16384 * 4 4 gpu 5 M
16384 * 8 8 gpu 6.5 M

1 gpu,

image

2 gpu,

image

4 gpu,

image

8 gpu,

image

@ZiyueHuang
Copy link
Member Author

export MXNET_GPU_TEMP_COPY=4

batch size Device samples/sec
16384 1 gpu 3.4 M
16384 * 2 2 gpu 4.4 M
16384 * 4 4 gpu 6.7 M
16384 * 8 8 gpu 9.1 M

1 gpu,

image

2 gpu,

image

4 gpu,

image

8 gpu,

image

profile.zip

const TBlob& rowid_i = val_rowids[i].second.data();
if (rowid_i.dptr<IType>() != first_dptr
|| rowid_i.Size() != first_size) {
is_same_rowid = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can return false directly if they don't match

}, ret.ctx(), const_vars, {ret.var(), rsc.var},
FnProperty::kNormal, priority, PROFILER_MESSAGE("RowSparseElementwiseSum"));
} else {
LOG(FATAL) << "Not implemented for storage_type " << stype;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<< common::stype_string(stype);


check_row_sparse_pull(kv, 1, mx.gpu(0))
check_row_sparse_pull(kv, 4, mx.gpu(0))
check_rsp_pull(kv, 1, [mx.gpu(0)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add support for passing a list of values with a single rowid?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

for i in range(count):
row_id = np.random.randint(num_rows, size=num_rows)
row_ids.append(mx.nd.array(row_id, dtype='int64'))
row_ids_to_pull = row_ids[0] if (len(row_ids) == 1 or is_same_rowid) else row_ids
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test here for single rowid with multiple vals

@ZiyueHuang
Copy link
Member Author

Anything to address? @reminisce

} else {
LOG(FATAL) << "storage type " << stype << " not implemented for device yet";
}
sorted_key_attrs_.push_back(std::make_tuple(key, shape, dtype, stype));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using emplace_back(key, shape, dtype, stype) can avoid constructing temporary tuple object.

@@ -681,8 +749,9 @@ class CommDevice : public Comm {
}
for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) {
int key = std::get<0>(sorted_key_attrs_[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const int

@@ -681,8 +749,9 @@ class CommDevice : public Comm {
}
for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) {
int key = std::get<0>(sorted_key_attrs_[i]);
TShape s = std::get<1>(sorted_key_attrs_[i]);
TShape shape = std::get<1>(sorted_key_attrs_[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const TShape&

@@ -681,8 +749,9 @@ class CommDevice : public Comm {
}
for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) {
int key = std::get<0>(sorted_key_attrs_[i]);
TShape s = std::get<1>(sorted_key_attrs_[i]);
TShape shape = std::get<1>(sorted_key_attrs_[i]);
int type = std::get<2>(sorted_key_attrs_[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const int

bool CheckSameRowid(
const std::vector<std::pair<NDArray*, NDArray>>& val_rowids) {
MSHADOW_TYPE_SWITCH(val_rowids[0].second.dtype(), IType, {
const TBlob& rowid_first = val_rowids[0].second.data();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accessing data() outside engine is dangerous. We can compare NDArray::ptr_ and offset instead.

row_ids = [row_ids]
assert(isinstance(row_ids, list)), \
"row_ids should be NDArray or list of NDArray"
out_val = out
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer first_out to out_val. I also recommend document the optimization upfront instead of and the end of the fucntion:
"When there is only one row_id, we can invoke KVStoreRowSparsePull just once and broadcast the result to all the rest of outputs"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments are added into doc string

"row_ids should be NDArray or list of NDArray"
out_val = out
# whether row_ids are the same
is_same_rowid = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer single_rowid to is_same_rowid

@eric-haibin-lin eric-haibin-lin merged commit 786e376 into apache:master Jan 15, 2018
CodingCat pushed a commit to CodingCat/mxnet that referenced this pull request Jan 16, 2018
…che#8732)

* comm device for rsp push and pull

* update

* update test

* optimization for same row_ids

* add stream->wait

* remove using space

* fix race of rsc and extend ElementwiseSum to rsp cases

* add log fatal in ElementwiseSum

* direct copy rows if full rsp and put all outputs on ctx of src

* trigger

* fix

* simplify copy

* move check same rowids to utils and add test for same rowids case

* remove direct copy row by row

* fix checkSameRowid

* gpu unique impl draft

* unique

* update

* fix windows build

* trigger windows build

* support single rowid with multiple vals

* address comments

* check same row_ids and copy in fronted

* revise names and disable test for local kvstore
yuxiangw pushed a commit to yuxiangw/incubator-mxnet that referenced this pull request Jan 25, 2018
…che#8732)

* comm device for rsp push and pull

* update

* update test

* optimization for same row_ids

* add stream->wait

* remove using space

* fix race of rsc and extend ElementwiseSum to rsp cases

* add log fatal in ElementwiseSum

* direct copy rows if full rsp and put all outputs on ctx of src

* trigger

* fix

* simplify copy

* move check same rowids to utils and add test for same rowids case

* remove direct copy row by row

* fix checkSameRowid

* gpu unique impl draft

* unique

* update

* fix windows build

* trigger windows build

* support single rowid with multiple vals

* address comments

* check same row_ids and copy in fronted

* revise names and disable test for local kvstore
@ZiyueHuang ZiyueHuang deleted the comm_device branch January 30, 2018 11:30
rahul003 pushed a commit to rahul003/mxnet that referenced this pull request Jun 4, 2018
…che#8732)

* comm device for rsp push and pull

* update

* update test

* optimization for same row_ids

* add stream->wait

* remove using space

* fix race of rsc and extend ElementwiseSum to rsp cases

* add log fatal in ElementwiseSum

* direct copy rows if full rsp and put all outputs on ctx of src

* trigger

* fix

* simplify copy

* move check same rowids to utils and add test for same rowids case

* remove direct copy row by row

* fix checkSameRowid

* gpu unique impl draft

* unique

* update

* fix windows build

* trigger windows build

* support single rowid with multiple vals

* address comments

* check same row_ids and copy in fronted

* revise names and disable test for local kvstore
zheng-da pushed a commit to zheng-da/incubator-mxnet that referenced this pull request Jun 28, 2018
…che#8732)

* comm device for rsp push and pull

* update

* update test

* optimization for same row_ids

* add stream->wait

* remove using space

* fix race of rsc and extend ElementwiseSum to rsp cases

* add log fatal in ElementwiseSum

* direct copy rows if full rsp and put all outputs on ctx of src

* trigger

* fix

* simplify copy

* move check same rowids to utils and add test for same rowids case

* remove direct copy row by row

* fix checkSameRowid

* gpu unique impl draft

* unique

* update

* fix windows build

* trigger windows build

* support single rowid with multiple vals

* address comments

* check same row_ids and copy in fronted

* revise names and disable test for local kvstore
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants