Skip to content

Commit

Permalink
Merge pull request #11233 from tensor-tang/multithreads
Browse files Browse the repository at this point in the history
Fix abort issue in cpu multi-threads
  • Loading branch information
tensor-tang authored Jun 6, 2018
2 parents 4f95bc9 + 4b7b17a commit 3a29404
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 35 deletions.
12 changes: 11 additions & 1 deletion paddle/fluid/framework/scope.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Scope& Scope::NewScope() const {
}

Variable* Scope::Var(const std::string& name) {
// acquire the lock when new var under this scope
std::unique_lock<std::mutex> lock(mutex_);
auto* v = FindVarLocally(name);
if (v != nullptr) return v;

Expand All @@ -62,11 +64,17 @@ Variable* Scope::Var(std::string* name) {
}

Variable* Scope::FindVar(const std::string& name) const {
// acquire the lock when find var
std::unique_lock<std::mutex> lock(mutex_);
return FindVarInternal(name);
}

Variable* Scope::FindVarInternal(const std::string& name) const {
auto var = FindVarLocally(name);
if (var != nullptr) {
return var;
}
return (parent_ == nullptr) ? nullptr : parent_->FindVar(name);
return (parent_ == nullptr) ? nullptr : parent_->FindVarInternal(name);
}

const Scope* Scope::FindScope(const Variable* var) const {
Expand All @@ -78,6 +86,7 @@ const Scope* Scope::FindScope(const Variable* var) const {
return (parent_ == nullptr) ? nullptr : parent_->FindScope(var);
}
void Scope::DropKids() {
std::unique_lock<std::mutex> lock(mutex_);
for (Scope* s : kids_) delete s;
kids_.clear();
}
Expand Down Expand Up @@ -105,6 +114,7 @@ void Scope::DeleteScope(Scope* scope) const {
}

void Scope::EraseVars(const std::vector<std::string>& var_names) {
std::unique_lock<std::mutex> lock(mutex_);
std::set<std::string> var_set(var_names.begin(), var_names.end());
for (auto it = vars_.begin(); it != vars_.end();) {
if (var_set.find(it->first) != var_set.end()) {
Expand Down
12 changes: 9 additions & 3 deletions paddle/fluid/framework/scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,20 @@ class Scope {
// Rename variable to a new name and return the new name
std::string Rename(const std::string& origin_name) const;

/// Caller doesn't own the returned Variable.
Variable* FindVarLocally(const std::string& name) const;

private:
// Call Scope::NewScope for a sub-scope.
explicit Scope(Scope const* parent) : parent_(parent) {}

// Called by FindVar recursively.
// Caller doesn't own the returned Variable.
Variable* FindVarInternal(const std::string& name) const;

// Called by FindVarInternal and Var.
// Caller doesn't own the returned Variable.
Variable* FindVarLocally(const std::string& name) const;

mutable std::unordered_map<std::string, std::unique_ptr<Variable>> vars_;

// Scope in `kids_` are owned by this class.
mutable std::list<Scope*> kids_;
Scope const* parent_{nullptr};
Expand Down
57 changes: 26 additions & 31 deletions paddle/fluid/inference/tests/book/test_inference_nlp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,22 @@ void SplitData(
}

void ThreadRunInfer(
const int tid, paddle::framework::Executor* executor,
paddle::framework::Scope* scope,
const std::unique_ptr<paddle::framework::ProgramDesc>& inference_program,
const int tid, paddle::framework::Scope* scope,
const std::vector<std::vector<const paddle::framework::LoDTensor*>>& jobs) {
auto copy_program = std::unique_ptr<paddle::framework::ProgramDesc>(
new paddle::framework::ProgramDesc(*inference_program));
// maybe framework:ProgramDesc is not thread-safe
auto& sub_scope = scope->NewScope();
auto place = paddle::platform::CPUPlace();
auto executor = paddle::framework::Executor(place);
auto inference_program =
paddle::inference::Load(&executor, scope, FLAGS_model_path);

std::string feed_holder_name = "feed_" + paddle::string::to_string(tid);
std::string fetch_holder_name = "fetch_" + paddle::string::to_string(tid);
copy_program->SetFeedHolderName(feed_holder_name);
copy_program->SetFetchHolderName(fetch_holder_name);
auto ctx = executor.Prepare(*inference_program, /*block_id*/ 0);
executor.CreateVariables(*inference_program, &sub_scope, /*block_id*/ 0);

const std::vector<std::string>& feed_target_names =
copy_program->GetFeedTargetNames();
inference_program->GetFeedTargetNames();
const std::vector<std::string>& fetch_target_names =
copy_program->GetFetchTargetNames();
inference_program->GetFetchTargetNames();

PADDLE_ENFORCE_EQ(fetch_target_names.size(), 1UL);
std::map<std::string, paddle::framework::LoDTensor*> fetch_targets;
Expand All @@ -131,9 +130,8 @@ void ThreadRunInfer(
auto start_ms = GetCurrentMs();
for (size_t i = 0; i < inputs.size(); ++i) {
feed_targets[feed_target_names[0]] = inputs[i];
executor->Run(*copy_program, &sub_scope, &feed_targets, &fetch_targets,
true /*create_local_scope*/, true /*create_vars*/,
feed_holder_name, fetch_holder_name);
executor.RunPreparedContext(ctx.get(), &sub_scope, &feed_targets,
&fetch_targets, false /*create_local_scope*/);
}
auto stop_ms = GetCurrentMs();
scope->DeleteScope(&sub_scope);
Expand All @@ -158,22 +156,10 @@ TEST(inference, nlp) {
LOG(INFO) << "Number of samples (seq_len<1024): " << datasets.size();
LOG(INFO) << "Total number of words: " << num_total_words;

const bool model_combined = false;
// 0. Call `paddle::framework::InitDevices()` initialize all the devices
// 1. Define place, executor, scope
auto place = paddle::platform::CPUPlace();
auto executor = paddle::framework::Executor(place);
std::unique_ptr<paddle::framework::Scope> scope(
new paddle::framework::Scope());

// 2. Initialize the inference_program and load parameters
std::unique_ptr<paddle::framework::ProgramDesc> inference_program;
inference_program =
InitProgram(&executor, scope.get(), FLAGS_model_path, model_combined);
if (FLAGS_use_mkldnn) {
EnableMKLDNN(inference_program);
}

#ifdef PADDLE_WITH_MKLML
// only use 1 thread number per std::thread
omp_set_dynamic(0);
Expand All @@ -189,21 +175,30 @@ TEST(inference, nlp) {
start_ms = GetCurrentMs();
for (int i = 0; i < FLAGS_num_threads; ++i) {
threads.emplace_back(
new std::thread(ThreadRunInfer, i, &executor, scope.get(),
std::ref(inference_program), std::ref(jobs)));
new std::thread(ThreadRunInfer, i, scope.get(), std::ref(jobs)));
}
for (int i = 0; i < FLAGS_num_threads; ++i) {
threads[i]->join();
}
stop_ms = GetCurrentMs();
} else {
if (FLAGS_prepare_vars) {
executor.CreateVariables(*inference_program, scope.get(), 0);
// 1. Define place, executor, scope
auto place = paddle::platform::CPUPlace();
auto executor = paddle::framework::Executor(place);

// 2. Initialize the inference_program and load parameters
std::unique_ptr<paddle::framework::ProgramDesc> inference_program;
inference_program = InitProgram(&executor, scope.get(), FLAGS_model_path,
/*model combined*/ false);
if (FLAGS_use_mkldnn) {
EnableMKLDNN(inference_program);
}
// always prepare context
std::unique_ptr<paddle::framework::ExecutorPrepareContext> ctx;
ctx = executor.Prepare(*inference_program, 0);

if (FLAGS_prepare_vars) {
executor.CreateVariables(*inference_program, scope.get(), 0);
}
// preapre fetch
const std::vector<std::string>& fetch_target_names =
inference_program->GetFetchTargetNames();
Expand Down

0 comments on commit 3a29404

Please sign in to comment.