From f578793ca2271c3a18e1db49d17282e504f9c0f3 Mon Sep 17 00:00:00 2001 From: qingshui Date: Wed, 28 Jul 2021 19:19:22 +0800 Subject: [PATCH 1/2] support share embeding --- paddle/fluid/framework/data_feed.cc | 5 + paddle/fluid/framework/data_feed.h | 65 ++++++---- paddle/fluid/framework/data_set.cc | 4 + paddle/fluid/framework/fleet/box_wrapper.cc | 124 ++++++++++---------- paddle/fluid/framework/fleet/box_wrapper.cu | 81 ++++++++----- paddle/fluid/framework/fleet/box_wrapper.h | 4 +- paddle/fluid/operators/pull_box_sparse_op.h | 6 +- paddle/fluid/platform/flags.cc | 4 + python/paddle/fluid/__init__.py | 1 + 9 files changed, 179 insertions(+), 115 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 91a03663a7f35c..d953f1966e14cd 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -2763,6 +2763,7 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByLine(void) { SlotRecordPool().put(&record_vec); } record_vec.clear(); + record_vec.shrink_to_fit(); timeline.Pause(); VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename << ", cost time=" << timeline.ElapsedSec() @@ -2925,6 +2926,7 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByCommand(void) { SlotRecordPool().put(&record_vec); } record_vec.clear(); + record_vec.shrink_to_fit(); timeline.Pause(); VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename << ", lines=" << lines @@ -3177,6 +3179,7 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByLib(void) { SlotRecordPool().put(&record_vec); } record_vec.clear(); + record_vec.shrink_to_fit(); timeline.Pause(); VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename << ", cost time=" << timeline.ElapsedSec() @@ -3257,6 +3260,7 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByCommand(void) { SlotRecordPool().put(&record_vec); } record_vec.clear(); + record_vec.shrink_to_fit(); timeline.Pause(); VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename << ", cost time=" << timeline.ElapsedSec() @@ -3481,6 +3485,7 @@ void InputTableDataFeed::LoadIntoMemoryByLib() { SlotRecordPool().put(&record_vec); } record_vec.clear(); + record_vec.shrink_to_fit(); timeline.Pause(); VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename << ", cost time=" << timeline.ElapsedSec() diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 0894778e4ddfc8..73b0cb6d0a1f41 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -54,6 +54,8 @@ DECLARE_int32(padbox_record_pool_max_size); DECLARE_int32(padbox_slotpool_thread_num); DECLARE_int32(padbox_slotrecord_extend_dim); DECLARE_bool(padbox_auc_runner_mode); +DECLARE_bool(enable_slotrecord_reset_shrink); +DECLARE_bool(enable_slotpool_wait_release); namespace paddle { namespace framework { @@ -776,10 +778,6 @@ struct SlotValues { std::vector slot_values; std::vector slot_offsets; - ~SlotValues() { - slot_values.shrink_to_fit(); - slot_offsets.shrink_to_fit(); - } void add_values(const T* values, uint32_t num) { if (slot_offsets.empty()) { slot_offsets.push_back(0); @@ -809,9 +807,13 @@ struct SlotValues { } slot_offsets[slot_num] = slot_values.size(); } - void clear(void) { + void clear(bool shrink) { slot_offsets.clear(); slot_values.clear(); + if (shrink) { + slot_values.shrink_to_fit(); + slot_offsets.shrink_to_fit(); + } } }; @@ -830,14 +832,11 @@ struct SlotRecordObject { SlotValues slot_uint64_feasigns_; SlotValues slot_float_feasigns_; - ~SlotRecordObject() { - slot_uint64_feasigns_.clear(); - slot_float_feasigns_.clear(); - } - - void reset(void) { - slot_uint64_feasigns_.clear(); - slot_float_feasigns_.clear(); + ~SlotRecordObject() { clear(true); } + void reset(void) { clear(FLAGS_enable_slotrecord_reset_shrink); } + void clear(bool shrink) { + slot_uint64_feasigns_.clear(shrink); + slot_float_feasigns_.clear(shrink); } }; using SlotRecord = SlotRecordObject*; @@ -891,15 +890,16 @@ inline int GetTotalFeaNum(const std::vector& slot_record, template class SlotObjAllocator { public: - SlotObjAllocator() : free_nodes_(NULL), capacity_(0) {} + explicit SlotObjAllocator(std::function deleter) + : free_nodes_(NULL), capacity_(0), deleter_(deleter) {} ~SlotObjAllocator() { clear(); } - void clear(void) { + void clear() { T* tmp = NULL; while (free_nodes_ != NULL) { tmp = reinterpret_cast(reinterpret_cast(free_nodes_)); free_nodes_ = free_nodes_->next; - delete tmp; + deleter_(tmp); --capacity_; } CHECK_EQ(capacity_, static_cast(0)); @@ -928,17 +928,21 @@ class SlotObjAllocator { }; Node* free_nodes_; // a list size_t capacity_; + std::function deleter_ = nullptr; }; static const int OBJPOOL_BLOCK_SIZE = 10000; class SlotObjPool { public: - SlotObjPool() : max_capacity_(FLAGS_padbox_record_pool_max_size) { + SlotObjPool() + : max_capacity_(FLAGS_padbox_record_pool_max_size), + alloc_(free_slotrecord) { ins_chan_ = MakeChannel(); ins_chan_->SetBlockSize(OBJPOOL_BLOCK_SIZE); for (int i = 0; i < FLAGS_padbox_slotpool_thread_num; ++i) { threads_.push_back(std::thread([this]() { run(); })); } disable_pool_ = false; + count_ = 0; } ~SlotObjPool() { ins_chan_->Close(); @@ -963,6 +967,7 @@ class SlotObjPool { } } mutex_.unlock(); + count_ += n; if (size == n) { return; } @@ -983,19 +988,23 @@ class SlotObjPool { } void run(void) { std::vector input; - while (ins_chan_->Read(input)) { + while (ins_chan_->ReadOnce(input, OBJPOOL_BLOCK_SIZE)) { if (input.empty()) { continue; } // over max capacity - if (disable_pool_ || input.size() + capacity() > max_capacity_) { + size_t n = input.size(); + count_ -= n; + if (disable_pool_ || n + capacity() > max_capacity_) { for (auto& t : input) { free_slotrecord(t); } } else { - mutex_.lock(); for (auto& t : input) { t->reset(); + } + mutex_.lock(); + for (auto& t : input) { alloc_.release(t); } mutex_.unlock(); @@ -1004,9 +1013,20 @@ class SlotObjPool { } } void clear(void) { + platform::Timer timeline; + timeline.Start(); mutex_.lock(); alloc_.clear(); mutex_.unlock(); + // wait release channel data + if (FLAGS_enable_slotpool_wait_release) { + while (!ins_chan_->Empty()) { + sleep(1); + } + } + timeline.Pause(); + LOG(WARNING) << "clear slot pool data size=" << count_.load() + << ", span=" << timeline.ElapsedSec(); } size_t capacity(void) { mutex_.lock(); @@ -1022,6 +1042,7 @@ class SlotObjPool { std::mutex mutex_; SlotObjAllocator alloc_; bool disable_pool_; + std::atomic count_; // NOLINT }; inline SlotObjPool& SlotRecordPool() { @@ -1570,7 +1591,9 @@ class SlotPaddleBoxDataFeed : public DataFeed { virtual ~SlotPaddleBoxDataFeed() { #if defined(PADDLE_WITH_CUDA) && defined(_LINUX) if (pack_ != nullptr) { - LOG(WARNING) << "pack batch total time: " << batch_timer_.ElapsedSec() + LOG(WARNING) << "gpu: " + << boost::get(place_).GetDeviceId() + << ", pack batch total time: " << batch_timer_.ElapsedSec() << "[copy:" << pack_->trans_time_span() << ",fill:" << fill_timer_.ElapsedSec() << ",memory:" << offset_timer_.ElapsedSec() diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 691cf001f43f6c..9af68f1a6920f6 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -1808,6 +1808,10 @@ void PadBoxSlotDataset::ShuffleData(int thread_num) { wg.wait(); timer.Pause(); + data.shrink_to_fit(); + loc_datas.shrink_to_fit(); + releases.shrink_to_fit(); + double span = timer.ElapsedSec(); if (span > max_shuffle_span_) { max_shuffle_span_ = span; diff --git a/paddle/fluid/framework/fleet/box_wrapper.cc b/paddle/fluid/framework/fleet/box_wrapper.cc index 6cb395d3fc342f..e2b8cd642d11c5 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cc +++ b/paddle/fluid/framework/fleet/box_wrapper.cc @@ -31,10 +31,6 @@ namespace framework { std::shared_ptr BoxWrapper::s_instance_ = nullptr; std::shared_ptr BoxWrapper::data_shuffle_ = nullptr; cudaStream_t BoxWrapper::stream_list_[MAX_GPU_NUM]; -// int BoxWrapper::embedx_dim_ = 8; -// int BoxWrapper::expand_embed_dim_ = 0; -// int BoxWrapper::feature_type_ = 0; -// float BoxWrapper::pull_embedx_scale_ = 1.0; void BasicAucCalculator::add_unlock_data(double pred, int label) { PADDLE_ENFORCE_GE(pred, 0.0, platform::errors::PreconditionNotMet( @@ -372,24 +368,24 @@ void BasicAucCalculator::compute() { void BoxWrapper::CheckEmbedSizeIsValid(int embedx_dim, int expand_embed_dim) { if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { - PADDLE_ENFORCE_EQ( - (embedx_dim % boxps::SHARE_EMBEDDING_NUM), 0, - platform::errors::InvalidArgument( - "SetInstance(): invalid embedx_dim. " - "embedx_dim % boxps::SHARE_EMBEDDING_NUM shoule be 0")); + PADDLE_ENFORCE_EQ((embedx_dim % expand_embed_dim), 0, + platform::errors::InvalidArgument( + "SetInstance(): invalid embedx_dim. " + "embedx_dim % expand_embed_dim shoule be 0")); - embedx_dim = embedx_dim / boxps::SHARE_EMBEDDING_NUM; + embedx_dim = embedx_dim / expand_embed_dim; + } else { + PADDLE_ENFORCE_EQ(expand_embed_dim_, expand_embed_dim, + platform::errors::InvalidArgument( + "SetInstance(): invalid expand_embed_dim. When " + "expand_embed_dim = %d, but got %d.", + expand_embed_dim_, expand_embed_dim)); } PADDLE_ENFORCE_EQ( embedx_dim_, embedx_dim, platform::errors::InvalidArgument("SetInstance(): invalid embedx_dim. " "When embedx_dim = %d, but got %d.", embedx_dim_, embedx_dim)); - PADDLE_ENFORCE_EQ(expand_embed_dim_, expand_embed_dim, - platform::errors::InvalidArgument( - "SetInstance(): invalid expand_embed_dim. When " - "expand_embed_dim = %d, but got %d.", - expand_embed_dim_, expand_embed_dim)); } void BoxWrapper::PullSparse(const paddle::platform::Place& place, @@ -408,33 +404,36 @@ void BoxWrapper::PullSparse(const paddle::platform::Place& place, } \ } break -#define PULLSPARSE_CASE(i, ...) \ - case i: { \ - constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ - constexpr size_t SingleEmbedxDim = \ - EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \ - PullSparseCase>( \ - place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ - PullSparseCase>( \ - place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT) || \ - feature_type_ == static_cast(boxps::FEATURE_SHOWCLK)) { \ - PullSparseCase>( \ - place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ - } else { \ - PullSparseCase>( \ - place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ - } \ +#define PULLSPARSE_CASE(i, ...) \ + case i: { \ + constexpr size_t ExpandDim = i; \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + PullSparseCase< \ + boxps::FeaturePullValueGpuShareEmbedding>( \ + place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + PullSparseCase>( \ + place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_QUANT) || \ + feature_type_ == static_cast(boxps::FEATURE_SHOWCLK)) { \ + PullSparseCase>( \ + place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ + } else { \ + PullSparseCase>( \ + place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ + } \ } break CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim); - switch (hidden_size - cvm_offset_) { - EMBEDX_CASE(8, PULLSPARSE_CASE(0); PULLSPARSE_CASE(8); + switch (embedx_dim_) { + EMBEDX_CASE(8, PULLSPARSE_CASE(0); PULLSPARSE_CASE(1); PULLSPARSE_CASE(2); + PULLSPARSE_CASE(3); PULLSPARSE_CASE(4); PULLSPARSE_CASE(5); + PULLSPARSE_CASE(6); PULLSPARSE_CASE(7); PULLSPARSE_CASE(8); + PULLSPARSE_CASE(64);); + EMBEDX_CASE(16, PULLSPARSE_CASE(0); PULLSPARSE_CASE(1); PULLSPARSE_CASE(2); + PULLSPARSE_CASE(3); PULLSPARSE_CASE(4); PULLSPARSE_CASE(5); + PULLSPARSE_CASE(6); PULLSPARSE_CASE(7); PULLSPARSE_CASE(8); PULLSPARSE_CASE(64);); - EMBEDX_CASE(16, PULLSPARSE_CASE(0); PULLSPARSE_CASE(64);); EMBEDX_CASE(32, PULLSPARSE_CASE(0);); EMBEDX_CASE(64, PULLSPARSE_CASE(0);); EMBEDX_CASE(256, PULLSPARSE_CASE(0);); @@ -466,33 +465,36 @@ void BoxWrapper::PushSparseGrad(const paddle::platform::Place& place, } \ } break -#define PUSHSPARSE_CASE(i, ...) \ - case i: { \ - constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ - constexpr size_t SingleEmbedxDim = \ - EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \ - PushSparseGradCase>(place, keys, grad_values, slot_lengths, \ - hidden_size, expand_embed_dim, \ - batch_size); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ - PushSparseGradCase< \ - boxps::FeaturePushValueGpuPCOC>( \ - place, keys, grad_values, slot_lengths, hidden_size, \ - expand_embed_dim, batch_size); \ - } else { \ - PushSparseGradCase>( \ - place, keys, grad_values, slot_lengths, hidden_size, \ - expand_embed_dim, batch_size); \ - } \ +#define PUSHSPARSE_CASE(i, ...) \ + case i: { \ + constexpr size_t ExpandDim = i; \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + PushSparseGradCase< \ + boxps::FeaturePushValueGpuShareEmbedding>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + PushSparseGradCase< \ + boxps::FeaturePushValueGpuPCOC>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ + } else { \ + PushSparseGradCase>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ + } \ } break CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim); - switch (hidden_size - cvm_offset_) { - EMBEDX_CASE(8, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(8); + switch (embedx_dim_) { + EMBEDX_CASE(8, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(1); PUSHSPARSE_CASE(2); + PUSHSPARSE_CASE(3); PUSHSPARSE_CASE(4); PUSHSPARSE_CASE(5); + PUSHSPARSE_CASE(6); PUSHSPARSE_CASE(7); PUSHSPARSE_CASE(8); + PUSHSPARSE_CASE(64);); + EMBEDX_CASE(16, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(1); PUSHSPARSE_CASE(2); + PUSHSPARSE_CASE(3); PUSHSPARSE_CASE(4); PUSHSPARSE_CASE(5); + PUSHSPARSE_CASE(6); PUSHSPARSE_CASE(7); PUSHSPARSE_CASE(8); PUSHSPARSE_CASE(64);); - EMBEDX_CASE(16, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(64);); EMBEDX_CASE(32, PUSHSPARSE_CASE(0);); EMBEDX_CASE(64, PUSHSPARSE_CASE(0);); EMBEDX_CASE(256, PUSHSPARSE_CASE(0);); diff --git a/paddle/fluid/framework/fleet/box_wrapper.cu b/paddle/fluid/framework/fleet/box_wrapper.cu index 3e2f8fc63183d1..49dfb813864c90 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cu +++ b/paddle/fluid/framework/fleet/box_wrapper.cu @@ -503,15 +503,7 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, #define EXPAND_EMBED_PULL_CASE(i, ...) \ case i: { \ constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ - constexpr size_t SingleEmbedxDim = \ - EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \ - FeaturePullCopy>( \ - stream, gpu_keys, gpu_values, total_values_gpu, hidden_size, \ - EmbedxDim, total_length, total_dims, slot_lens, slot_num, key2slot, \ - pull_embedx_scale_, cvm_offset_); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ FeaturePullCopy>( \ stream, gpu_keys, gpu_values, total_values_gpu, hidden_size, \ EmbedxDim, total_length, total_dims, slot_lens, slot_num, key2slot, \ @@ -555,10 +547,29 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, } \ } break - switch (hidden_size - cvm_offset_) { - EMBEDX_CASE(8, EXPAND_EMBED_PULL_CASE(0); EXPAND_EMBED_PULL_NNCROSS(8); - EXPAND_EMBED_PULL_NNCROSS(64);); - EMBEDX_CASE(16, EXPAND_EMBED_PULL_CASE(0); EXPAND_EMBED_PULL_NNCROSS(64);); +#define EXPAND_EMBED_PULL_SHARE(i, ...) \ + case i: { \ + constexpr size_t ExpandDim = i; \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + FeaturePullCopy< \ + boxps::FeaturePullValueGpuShareEmbedding>( \ + stream, gpu_keys, gpu_values, total_values_gpu, hidden_size, \ + (hidden_size - cvm_offset_), total_length, total_dims, slot_lens, \ + slot_num, key2slot, pull_embedx_scale_, cvm_offset_); \ + } \ + } break + + switch (embedx_dim_) { + EMBEDX_CASE(8, EXPAND_EMBED_PULL_CASE(0); EXPAND_EMBED_PULL_SHARE(1); + EXPAND_EMBED_PULL_SHARE(2); EXPAND_EMBED_PULL_SHARE(3); + EXPAND_EMBED_PULL_SHARE(4); EXPAND_EMBED_PULL_SHARE(5); + EXPAND_EMBED_PULL_SHARE(6); EXPAND_EMBED_PULL_SHARE(7); + EXPAND_EMBED_PULL_SHARE(8); EXPAND_EMBED_PULL_NNCROSS(64);); + EMBEDX_CASE(16, EXPAND_EMBED_PULL_CASE(0); EXPAND_EMBED_PULL_SHARE(1); + EXPAND_EMBED_PULL_SHARE(2); EXPAND_EMBED_PULL_SHARE(3); + EXPAND_EMBED_PULL_SHARE(4); EXPAND_EMBED_PULL_SHARE(5); + EXPAND_EMBED_PULL_SHARE(6); EXPAND_EMBED_PULL_SHARE(7); + EXPAND_EMBED_PULL_SHARE(8); EXPAND_EMBED_PULL_NNCROSS(64);); EMBEDX_CASE(32, EXPAND_EMBED_PULL_CASE(0);); EMBEDX_CASE(64, EXPAND_EMBED_PULL_CASE(0);); EMBEDX_CASE(256, EXPAND_EMBED_PULL_CASE(0);); @@ -637,10 +648,9 @@ void FeaturePushCopyNNCross(cudaStream_t stream, void* dest, template void FeaturePushCopyShareEmbedding( cudaStream_t stream, void* dest, float** grad_values, const int hidden_size, - const size_t embedx_dim, const size_t expand_dim, const int total_length, - const int batch_size, const int* slot_vector, const int* total_dims, - const int64_t* slot_lens, const int slot_num, const int* key2slot, - const int cvm_offset) { + const size_t embedx_dim, const int total_length, const int batch_size, + const int* slot_vector, const int* total_dims, const int64_t* slot_lens, + const int slot_num, const int* key2slot, const int cvm_offset) { FeaturePushValueGpuType* push_grad_values = reinterpret_cast(dest); // share embedding @@ -680,15 +690,7 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place, #define EXPAND_EMBED_PUSH_CASE(i, ...) \ case i: { \ constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ - constexpr size_t SingleEmbedxDim = \ - EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \ - FeaturePushCopyShareEmbedding>( \ - stream, total_grad_values_gpu, grad_values, hidden_size, EmbedxDim, \ - ExpandDim, total_length, batch_size, d_slot_vector, total_dims, \ - slot_lens, slot_num, key2slot, cvm_offset_); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ FeaturePushCopy>( \ stream, total_grad_values_gpu, grad_values, hidden_size, EmbedxDim, \ total_length, batch_size, d_slot_vector, total_dims, slot_lens, \ @@ -719,10 +721,29 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place, } \ } break - switch (hidden_size - cvm_offset_) { - EMBEDX_CASE(8, EXPAND_EMBED_PUSH_CASE(0); EXPAND_EMBED_PUSH_NNCROSS(8); - EXPAND_EMBED_PUSH_NNCROSS(64);); - EMBEDX_CASE(16, EXPAND_EMBED_PUSH_CASE(0); EXPAND_EMBED_PUSH_NNCROSS(64);); +#define EXPAND_EMBED_PUSH_SHARE(i, ...) \ + case i: { \ + constexpr size_t ExpandDim = i; \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + FeaturePushCopyShareEmbedding< \ + boxps::FeaturePushValueGpuShareEmbedding>( \ + stream, total_grad_values_gpu, grad_values, hidden_size, \ + (hidden_size - cvm_offset_), total_length, batch_size, \ + d_slot_vector, total_dims, slot_lens, slot_num, key2slot, \ + cvm_offset_); \ + } \ + } break + switch (embedx_dim_) { + EMBEDX_CASE(8, EXPAND_EMBED_PUSH_CASE(0); EXPAND_EMBED_PUSH_SHARE(1); + EXPAND_EMBED_PUSH_SHARE(2); EXPAND_EMBED_PUSH_SHARE(3); + EXPAND_EMBED_PUSH_SHARE(4); EXPAND_EMBED_PUSH_SHARE(5); + EXPAND_EMBED_PUSH_SHARE(6); EXPAND_EMBED_PUSH_SHARE(7); + EXPAND_EMBED_PUSH_SHARE(8); EXPAND_EMBED_PUSH_NNCROSS(64);); + EMBEDX_CASE(16, EXPAND_EMBED_PUSH_CASE(0); EXPAND_EMBED_PUSH_SHARE(1); + EXPAND_EMBED_PUSH_SHARE(2); EXPAND_EMBED_PUSH_SHARE(3); + EXPAND_EMBED_PUSH_SHARE(4); EXPAND_EMBED_PUSH_SHARE(5); + EXPAND_EMBED_PUSH_SHARE(6); EXPAND_EMBED_PUSH_SHARE(7); + EXPAND_EMBED_PUSH_SHARE(8); EXPAND_EMBED_PUSH_NNCROSS(64);); EMBEDX_CASE(32, EXPAND_EMBED_PUSH_CASE(0);); EMBEDX_CASE(64, EXPAND_EMBED_PUSH_CASE(0);); EMBEDX_CASE(256, EXPAND_EMBED_PUSH_CASE(0);); diff --git a/paddle/fluid/framework/fleet/box_wrapper.h b/paddle/fluid/framework/fleet/box_wrapper.h index ddc39bf04b1731..1e75a1a7340892 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.h +++ b/paddle/fluid/framework/fleet/box_wrapper.h @@ -572,7 +572,7 @@ class BoxWrapper { // ToDo: feature gpu value param set diffent value if (s_instance_->feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { - s_instance_->cvm_offset_ = boxps::SHARE_EMBEDDING_NUM + 2; + s_instance_->cvm_offset_ = expand_embed_dim + 2; } else if (s_instance_->feature_type_ == static_cast(boxps::FEATURE_PCOC)) { s_instance_->cvm_offset_ = 8; @@ -1140,6 +1140,8 @@ class BoxWrapper { << ", wrapper gpu memory:" << dev.GpuMemUsed() << "MB"; dev.ResetTimer(); } + // get expand embed dim + int GetExpandEmbedDim(void) { return expand_embed_dim_; } private: static cudaStream_t stream_list_[MAX_GPU_NUM]; diff --git a/paddle/fluid/operators/pull_box_sparse_op.h b/paddle/fluid/operators/pull_box_sparse_op.h index ed39c87e85ee96..328ca40d1add70 100644 --- a/paddle/fluid/operators/pull_box_sparse_op.h +++ b/paddle/fluid/operators/pull_box_sparse_op.h @@ -137,8 +137,9 @@ static void PullBoxSparseFunctor(const framework::ExecutionContext &ctx) { #ifdef PADDLE_WITH_BOX_PS auto box_ptr = paddle::framework::BoxWrapper::GetInstance(); + auto expand_dim = box_ptr->GetExpandEmbedDim(); box_ptr->PullSparse(ctx.GetPlace(), all_keys, all_values, slot_lengths, - hidden_size, 0); + hidden_size, expand_dim); #endif } @@ -176,8 +177,9 @@ static void PushBoxSparseFunctor(const framework::ExecutionContext &ctx) { #ifdef PADDLE_WITH_BOX_PS auto hidden_size = ctx.Attr("size"); auto box_ptr = paddle::framework::BoxWrapper::GetInstance(); + auto expand_dim = box_ptr->GetExpandEmbedDim(); box_ptr->PushSparseGrad(ctx.GetPlace(), all_keys, all_grad_values, - slot_lengths, hidden_size, 0, batch_size); + slot_lengths, hidden_size, expand_dim, batch_size); #endif } diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index 94a379d93808f3..56329a843c63e5 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -596,3 +596,7 @@ DEFINE_bool(enable_ins_parser_file, false, "enable parser ins file , default false"); DEFINE_bool(enable_dense_nccl_barrier, false, "enable dense nccl barrier , default false"); +DEFINE_bool(enable_slotrecord_reset_shrink, false, + "enable slotrecord obejct reset shrink memory, default false"); +DEFINE_bool(enable_slotpool_wait_release, false, + "enable slotrecord obejct wait release, default false"); diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 2ff88b0077eebb..330aaa4910ffca 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -263,6 +263,7 @@ def __bootstrap__(): 'enable_binding_train_cpu', 'enable_ins_parser_file', 'enable_dense_nccl_barrier', + 'enable_slotrecord_reset_shrink', ] core.init_gflags(["--tryfromenv=" + ",".join(read_env_flags)]) core.init_glog(sys.argv[0]) From 983918d73fd31f89440f8c8f2c39a27fe6718220 Mon Sep 17 00:00:00 2001 From: qingshui Date: Thu, 29 Jul 2021 10:21:39 +0800 Subject: [PATCH 2/2] add boxps header --- cmake/external/box_ps.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/external/box_ps.cmake b/cmake/external/box_ps.cmake index b352c50b5cf2e0..9d72a9957dd457 100644 --- a/cmake/external/box_ps.cmake +++ b/cmake/external/box_ps.cmake @@ -20,7 +20,7 @@ IF((NOT DEFINED BOX_PS_VER) OR (NOT DEFINED BOX_PS_URL)) SET(BOX_PS_VER "0.1.1" CACHE STRING "" FORCE) SET(BOX_PS_NAME "box_ps" CACHE STRING "" FORCE) #SET(BOX_PS_URL "http://box-ps.gz.bcebos.com/box_ps.tar.gz" CACHE STRING "" FORCE) - SET(BOX_PS_URL "data-im.baidu.com:/home/work/var/CI_DATA/im/static/box_ps.tar.gz/box_ps.tar.gz.14" CACHE STRING "" FORCE) + SET(BOX_PS_URL "data-im.baidu.com:/home/work/var/CI_DATA/im/static/box_ps.tar.gz/box_ps.tar.gz.15" CACHE STRING "" FORCE) ENDIF() MESSAGE(STATUS "BOX_PS_NAME: ${BOX_PS_NAME}, BOX_PS_URL: ${BOX_PS_URL}") SET(BOX_PS_SOURCE_DIR "${THIRD_PARTY_PATH}/box_ps")