Skip to content

Commit

Permalink
fix race when temp space is used in copy & fix instance overwrite in …
Browse files Browse the repository at this point in the history
…g2c (apache#8867)

* fix race when temp space is used in copy

* fix instance overwrite in g2c

* example of g2c

* address comments
  • Loading branch information
ZiyueHuang authored and zhreshold committed Dec 14, 2017
1 parent 6123a1a commit b119192
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

# construct the module
# map the ctx_group attribute to the context assignment
group2ctxs={'dev1':mx.cpu(), 'dev2':[mx.gpu(i) for i in range(num_gpus)]}
group2ctxs={'dev1':[mx.cpu()]*num_gpus, 'dev2':[mx.gpu(i) for i in range(num_gpus)]}
mod = mx.module.Module(symbol=net, context=[mx.cpu()]*num_gpus, data_names=['user', 'item'],
label_names=['score'], group2ctxs=group2ctxs)

Expand Down
2 changes: 1 addition & 1 deletion python/mxnet/module/executor_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _prepare_group2ctxs(group2ctxs, ctx_len):
should be %d" % ctx_len
return group2ctxs
elif isinstance(group2ctxs, dict):
ret = [{}] * ctx_len
ret = [{} for i in range(ctx_len)]
for k, v in group2ctxs.items():
ctxs = None
if isinstance(v, ctx.Context):
Expand Down
59 changes: 35 additions & 24 deletions src/ndarray/ndarray.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,25 +454,22 @@ inline void CopyFromToDnsImpl(const NDArray& from, const NDArray& to, RunContext

// Make a copy of an NDArray based on storage type
template<typename from_xpu, typename to_xpu>
void CopyFromToImpl(const NDArray& from, const NDArray& to, RunContext rctx) {
void CopyFromToImpl(const NDArray& from, const NDArray& to,
RunContext rctx, const std::vector<Resource>& requested) {
using namespace std;
using namespace mshadow;
// if storage type doesn't match, cast the storage first
auto from_stype = from.storage_type();
auto to_stype = to.storage_type();
const NDArrayStorageType from_stype = from.storage_type();
const NDArrayStorageType to_stype = to.storage_type();
CHECK(from_stype == kDefaultStorage
|| to_stype == kDefaultStorage
|| from_stype == to_stype)
<< "Copying ndarray of stype = " << from_stype
<< " to stype = " << to_stype << " is not supported";
const auto from_ctx = from.ctx();
const auto to_ctx = to.ctx();
const Context from_ctx = from.ctx();
const Context to_ctx = to.ctx();
bool is_train = Imperative::Get()->is_training();
std::vector<Resource> requested;
if (is_same<from_xpu, mshadow::gpu>::value && from_stype != to_stype) {
requested.push_back(ResourceManager::Get()->Request(from_ctx,
ResourceRequest(ResourceRequest::kTempSpace)));
}

OpContext opctx{is_train,
rctx,
engine::CallbackOnComplete(),
Expand Down Expand Up @@ -518,43 +515,57 @@ void CopyFromTo(const NDArray& from, const NDArray& to, int priority) {
CHECK(from.shape().ndim() != 0)
<< "source operands have zero dimension shape";
// important: callback must always capture by value
int a = from.ctx().dev_mask();
int b = to.ctx().dev_mask();
const Context from_ctx = from.ctx();
const int a = from_ctx.dev_mask();
const int b = to.ctx().dev_mask();
std::vector<Engine::VarHandle> const_vars;
if (from.var() != to.var()) const_vars.push_back(from.var());

const NDArrayStorageType from_stype = from.storage_type();
const NDArrayStorageType to_stype = to.storage_type();

std::vector<Engine::VarHandle> mutable_vars(1, to.var());

std::vector<Resource> requested;
if (a == gpu::kDevMask && from_stype != to_stype) {
Resource rsc = ResourceManager::Get()->Request(from_ctx,
ResourceRequest(ResourceRequest::kTempSpace));
requested.push_back(rsc);
mutable_vars.push_back(rsc.var);
}

if (a == cpu::kDevMask && b == cpu::kDevMask) {
Engine::Get()->PushAsync(
[from, to](RunContext ctx, Engine::CallbackOnComplete on_complete) {
CopyFromToImpl<cpu, cpu>(from, to, ctx);
[from, to, requested](RunContext ctx, Engine::CallbackOnComplete on_complete) {
CopyFromToImpl<cpu, cpu>(from, to, ctx, requested);
on_complete();
}, from.ctx(), const_vars, {to.var()},
}, from.ctx(), const_vars, mutable_vars,
FnProperty::kNormal, priority, PROFILER_MESSAGE("CopyCPU2CPU"));
} else {
#if MXNET_USE_CUDA
if (a == cpu::kDevMask && b == gpu::kDevMask) {
Engine::Get()->PushAsync(
[from, to](RunContext ctx, Engine::CallbackOnComplete on_complete) {
CopyFromToImpl<cpu, gpu>(from, to, ctx);
[from, to, requested](RunContext ctx, Engine::CallbackOnComplete on_complete) {
CopyFromToImpl<cpu, gpu>(from, to, ctx, requested);
ctx.get_stream<gpu>()->Wait();
on_complete();
}, to.ctx(), const_vars, {to.var()},
}, to.ctx(), const_vars, mutable_vars,
FnProperty::kCopyToGPU, priority, PROFILER_MESSAGE("CopyCPU2GPU"));
} else if (a == gpu::kDevMask && b == cpu::kDevMask) {
Engine::Get()->PushAsync(
[from, to](RunContext ctx, Engine::CallbackOnComplete on_complete) {
CopyFromToImpl<gpu, cpu>(from, to, ctx);
[from, to, requested](RunContext ctx, Engine::CallbackOnComplete on_complete) {
CopyFromToImpl<gpu, cpu>(from, to, ctx, requested);
ctx.get_stream<gpu>()->Wait();
on_complete();
}, from.ctx(), const_vars, {to.var()},
}, from.ctx(), const_vars, mutable_vars,
FnProperty::kCopyFromGPU, priority, PROFILER_MESSAGE("CopyGPU2CPU"));
} else if (a == gpu::kDevMask && b == gpu::kDevMask) {
Engine::Get()->PushAsync(
[from, to](RunContext ctx, Engine::CallbackOnComplete on_complete) {
CopyFromToImpl<gpu, gpu>(from, to, ctx);
[from, to, requested](RunContext ctx, Engine::CallbackOnComplete on_complete) {
CopyFromToImpl<gpu, gpu>(from, to, ctx, requested);
ctx.get_stream<gpu>()->Wait();
on_complete();
}, from.ctx(), const_vars, {to.var()},
}, from.ctx(), const_vars, mutable_vars,
from.dtype() != to.dtype() ? FnProperty::kNormal : FnProperty::kCopyFromGPU,
priority, PROFILER_MESSAGE("CopyGPU2GPU"));
} else {
Expand Down
2 changes: 1 addition & 1 deletion tests/python/gpu/test_operator_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from test_sparse_ndarray import test_create_csr, test_create_row_sparse, test_sparse_nd_slice
from test_sparse_ndarray import test_create_sparse_nd_empty, test_create_sparse_nd_from_sparse
from test_sparse_ndarray import test_create_sparse_nd_from_dense, test_create_sparse_nd_infer_shape
from test_sparse_ndarray import test_sparse_nd_check_format
from test_sparse_ndarray import test_sparse_nd_check_format, test_sparse_nd_copy
from test_sparse_operator import *
from test_ndarray import *

Expand Down
11 changes: 7 additions & 4 deletions tests/python/unittest/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_module_input_grads():


def test_module_ctx_group():
def check_module_ctx_group(ctxs, group2ctxs):
def check_module_ctx_group(ctxs, group2ctxs, grad_ctxs=None):
with mx.AttrScope(ctx_group='dev1'):
a = mx.symbol.Variable('a')
a = a * 2
Expand All @@ -94,10 +94,13 @@ def check_module_ctx_group(ctxs, group2ctxs):
mod2.backward([mx.nd.ones(shape)])
mod2_input_grads = mod2.get_input_grads()

assert np.all(mod1_input_grads[0].asnumpy() == mod2_input_grads[0].asnumpy())
assert np.all(mod1_input_grads[1].asnumpy() == mod2_input_grads[1].asnumpy())
if grad_ctxs is not None:
assert(mod1_input_grads[0].context == grad_ctxs[0])
assert(mod1_input_grads[1].context == grad_ctxs[1])
assert(np.all(mod1_input_grads[0].asnumpy() == mod2_input_grads[0].asnumpy()))
assert(np.all(mod1_input_grads[1].asnumpy() == mod2_input_grads[1].asnumpy()))

check_module_ctx_group([mx.cpu(0)], {'dev1': mx.cpu(1), 'dev2': mx.cpu(2)})
check_module_ctx_group([mx.cpu(0)], {'dev1': mx.cpu(1), 'dev2': mx.cpu(2)}, grad_ctxs=[mx.cpu(1), mx.cpu(2)])
check_module_ctx_group([mx.cpu(0), mx.cpu(1)],
[{'dev1': mx.cpu(2), 'dev2': mx.cpu(3)}, {'dev1': mx.cpu(4), 'dev2': mx.cpu(5)}])
check_module_ctx_group([mx.cpu(0), mx.cpu(1)], {'dev1': mx.cpu(2), 'dev2': mx.cpu(3)})
Expand Down

0 comments on commit b119192

Please sign in to comment.