-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cpp parallel executor #9080
Cpp parallel executor #9080
Conversation
doc/design/parallel_executor.md
Outdated
opt = fluid.optimizer.SGDOptimizer() | ||
opt.minimize(avg_cost) | ||
|
||
# change Executor -> ParallelExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can let the user still use the Executor interface, and add an optional argument "gpu_list", and underlying if there are multiple GPUs available (either len(gpu_list) > 0, or gpu_list == None and multiple GPUs initialized), create and return the parallel executor instance.
doc/design/parallel_executor.md
Outdated
// e.g. sgd should wait for allreduce to be finished | ||
CallBack->BeforeOp(op); | ||
|
||
op->Run(*local_scope, place_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, the reason we need callback is ParallelExecutor
will call Executor::Run
, but need to be notified before and after each OP::Run
. Do we even need the Executor implementation anymore? Maybe we can consolidate them into a single executor, so that we don't need the callback anymore.
And it will be easier for the Python side, Python always create the same executor.
std::vector<OpHandle *> to_run; | ||
for (auto *var : to_remove) { | ||
for (auto *op : var->pending_ops_) { | ||
if (var->name_ == "mean_0.tmp_0@GRAD") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of this special case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just debug code... Sorry
struct OpHandle { | ||
std::vector<VarHandle *> inputs_; | ||
std::vector<VarHandle *> outputs_; | ||
platform::DeviceContext *dev_ctx_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add framework::Scope* scope_
?
} | ||
|
||
std::vector<LoDTensor> ParallelExecutor::Run( | ||
const std::vector<std::string> &fetch_tensors) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instantiate Variables here?
Paddle/paddle/fluid/framework/executor.cc
Lines 276 to 305 in 41894da
Scope* local_scope = scope; | |
if (create_vars) { | |
if (create_local_scope) { | |
local_scope = &scope->NewScope(); | |
for (auto& var : block.AllVars()) { | |
if (var->Name() == framework::kEmptyVarName) { | |
continue; | |
} | |
if (var->Persistable()) { | |
auto* ptr = scope->Var(var->Name()); | |
CreateTensor(ptr, var->GetType()); | |
VLOG(3) << "Create Variable " << var->Name() | |
<< " global, which pointer is " << ptr; | |
} else { | |
auto* ptr = local_scope->Var(var->Name()); | |
CreateTensor(ptr, var->GetType()); | |
VLOG(3) << "Create Variable " << var->Name() | |
<< " locally, which pointer is " << ptr; | |
} | |
} | |
} else { | |
for (auto& var : block.AllVars()) { | |
auto* ptr = local_scope->Var(var->Name()); | |
CreateTensor(ptr, var->GetType()); | |
VLOG(3) << "Create variable " << var->Name() << ", which pointer is " | |
<< ptr; | |
} | |
} // if (create_local_scope) | |
} // if (create_vars) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We need to instantiate variables here. We might extract this routine to a global function.
} | ||
}; | ||
|
||
member_->pool_.Run(op_run); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we should add a callback after we push operator run job to memory pool. In this callback, we change pending_var state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pending_var state has been changed during L404-L406
member_->local_scopes_.size() != 1) { // Is CUDA | ||
BuildNCCLCommunicator(); | ||
BCastParamsToGPUs(startup_program); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do not initialize parameters on the respective devices?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since randomize result might not same when seed = 0
… cpp_parallel_executor
void RunOp(std::unordered_map<VarHandleBase*, bool>& pending_vars, | ||
OpHandle* op) const; | ||
|
||
void PolishGraphToSupportDataHarzaeds() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataHarzaeds --> DataHazards
17f0491
to
c0c2e15
Compare
9441175
to
201f79d
Compare
result.vars_.resize(places_.size()); | ||
|
||
bool is_forwarding = true; | ||
for (auto *op : program.Block(0).AllOps()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Since the operators in a sub-block will be executed by a control flow operator, e.g., While
. The behaviour between control flow operators and computational operators should be same.
… cpp_parallel_executor
…o cpp_parallel_executor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have finished my review. @chengduoZH Let's verify the correctness and speed of transfomer and resnext. If they are OK, let's merge it soon so that everyone can start improving it
namespace details { | ||
|
||
struct FetchOpHandle : public OpHandleBase { | ||
FeedFetchList *data_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a "private:" then?
for (auto *op : program.Block(0).AllOps()) { | ||
bool change_forward = false; | ||
if (!is_forwarding) { | ||
// FIXME(yy): Do not hard code like this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Can you add your reply as comments in the code?
@@ -79,7 +79,18 @@ void* GPUAllocator::Alloc(size_t& index, size_t size) { | |||
// if size is 0. We just make sure it does. | |||
if (size <= 0) return nullptr; | |||
void* p; | |||
int prev_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest this to be a PADDLE_ENFORCE. if the current behavior works. Otherwise, reader will think the Allocator currently works on multiple GPUs.
PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error"); | ||
} | ||
|
||
void FetchOpHandle::WaitAndMergeCPUTensors() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean really wait on anything? Maybe just MergeCPUTensors?
// FIXME: Currently ScaleLossGradOp only use device_count as scale | ||
// factor. So it does not depend on any other operators. | ||
// VarHandle *loss = GetVarHandle(loss_var_name, place); | ||
// loss->pending_ops_.emplace_back(op_handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this op doesn't not depend on anything, when will it be scheduled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be run at the first time.
* | ||
* https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR) | ||
*/ | ||
static void PolishGraphToSupportDataHazards(SSAGraph *graph); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more comments to describe when does data hazard happens?
std::unordered_set<OpHandleBase *> ready_ops; | ||
|
||
auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) { | ||
pending_vars.insert(&var); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be skipped if generated_op_ is nullptr?
@@ -29,7 +29,7 @@ void FileReader::ReadNext(std::vector<LoDTensor> *out) { | |||
|
|||
PADDLE_ENFORCE_EQ(actual.size(), expect.size()); | |||
for (int j = 0; j < actual.size(); ++j) { | |||
PADDLE_ENFORCE(actual[i] == expect[i] || expect[i] == -1); | |||
// PADDLE_ENFORCE(actual[i] == expect[i] || expect[i] == -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update this line?
// Create local scopes. | ||
for (auto &scope : local_scopes_) { | ||
auto &local_scope = scope->NewScope(); | ||
*scope->Var("@TMP_SCOPE@")->GetMutable<Scope *>() = &local_scope; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are all the scopes using the same var name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When Op::Run
, there are some temporary variables will be created in local scopes. So, here just use Variable @TMP_SCOPE@
to holds these temporary variables. They will be destroied after a period.
auto *dep_var = new DummyVarHandle(); | ||
read_op->AddOutput(dep_var); | ||
write_op->AddInput(dep_var); | ||
graph->dep_vars_.emplace(dep_var); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be called data_hazard_vars_?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments have been added.
… cpp_parallel_executor
I just add a dependency engine to parse the dependencies of operators. There are still a lot of jobs need to be done.
Complete Broadcast parameters.Use thread pool to invoke operator parallelly.Complete NCCL AllReduce OpHandle in this implementation.I just use
VarHandle
andOpHandle
to parseProgram
as a SSA form graph. A variable is assigned by only one OpHandle. When all inputs ofOpHandle
is ready, theOpHandle
can be run.The speed of ResNeXt152 is