Skip to content

Commit

Permalink
[add slots shuffle] (PaddlePaddle#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangzhen38 authored Feb 1, 2023
1 parent c5a11a9 commit f023bee
Show file tree
Hide file tree
Showing 4 changed files with 369 additions and 1 deletion.
42 changes: 42 additions & 0 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,48 @@ void RecordCandidateList::AddAndGet(const Record& record,
mutex_.unlock();
}

void SlotRecordCandidateList::ReSize(size_t length) {
mutex_.lock();
capacity_ = length;
CHECK(capacity_ > 0); // NOLINT
candidate_list_.clear();
candidate_list_.resize(capacity_);
full_ = false;
cur_size_ = 0;
total_size_ = 0;
mutex_.unlock();
}

void SlotRecordCandidateList::ReInit() {
mutex_.lock();
full_ = false;
cur_size_ = 0;
total_size_ = 0;
mutex_.unlock();
}

void SlotRecordCandidateList::AddAndGet(const SlotRecord& record,
SlotRecordCandidate* result) {
mutex_.lock();
size_t index = 0;
++total_size_;
auto fleet_ptr = FleetWrapper::GetInstance();

if (!full_) {
candidate_list_[cur_size_++] = record;
full_ = (cur_size_ == capacity_);
} else {
CHECK(cur_size_ == capacity_);
index = fleet_ptr->LocalRandomEngine()() % total_size_;
if (index < capacity_) {
candidate_list_[index] = record;
}
}
index = fleet_ptr->LocalRandomEngine()() % cur_size_;
*result = candidate_list_[index];
mutex_.unlock();
}

void DataFeed::AddFeedVar(Variable* var, const std::string& name) {
CheckInit();
for (size_t i = 0; i < use_slots_.size(); ++i) {
Expand Down
107 changes: 107 additions & 0 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,113 @@ class RecordCandidateList {
std::unordered_set<uint16_t> slot_index_to_replace_;
};

struct SlotRecordCandidate {
std::string ins_id_;
std::unordered_multimap<uint16_t, uint64_t> feas_;
size_t shadow_index_ = -1; // Optimization for Reservoir Sample

SlotRecordCandidate() {}
SlotRecordCandidate(
const SlotRecord& rec,
const std::unordered_set<uint16_t>& slot_index_to_replace) {
int cnt = 0;
for (auto fea = 0; fea < static_cast<int>(
rec->slot_uint64_feasigns_.slot_offsets.size()) - 1; ++fea) {
if (slot_index_to_replace.find(fea) != slot_index_to_replace.end()) {
for (auto i = rec->slot_uint64_feasigns_.slot_offsets.at(cnt);
i < rec->slot_uint64_feasigns_.slot_offsets.at(cnt + 1);
i++) {
feas_.insert({cnt, rec->slot_uint64_feasigns_.slot_values.at(i)});
}
}
++cnt;
}
}
SlotRecordCandidate& operator=(const SlotRecord& rec) {
feas_.clear();
ins_id_ = rec->ins_id_;
int len_slot =
static_cast<int>(rec->slot_uint64_feasigns_.slot_offsets.size()) - 1;
for (auto cnt = 0; cnt < len_slot; ++cnt) {
for (auto i = rec->slot_uint64_feasigns_.slot_offsets.at(cnt);
i < rec->slot_uint64_feasigns_.slot_offsets.at(cnt + 1);
i++) {
feas_.insert({cnt, rec->slot_uint64_feasigns_.slot_values.at(i)});
}
}
return *this;
}
};

class SlotRecordCandidateList {
public:
SlotRecordCandidateList() = default;
SlotRecordCandidateList(const SlotRecordCandidateList&) {}

size_t Size() { return cur_size_; }
void ReSize(size_t length);

void ReInit();
void ReInitPass() {
for (size_t i = 0; i < cur_size_; ++i) {
if (candidate_list_[i].shadow_index_ != i) {
candidate_list_[i].ins_id_ =
candidate_list_[candidate_list_[i].shadow_index_].ins_id_;
candidate_list_[i].feas_.swap(
candidate_list_[candidate_list_[i].shadow_index_].feas_);
candidate_list_[i].shadow_index_ = i;
}
}
candidate_list_.resize(cur_size_);
}

void AddAndGet(const SlotRecord& record, SlotRecordCandidate* result);
void AddAndGet(const SlotRecord& record, size_t& index_result) { // NOLINT
// std::unique_lock<std::mutex> lock(mutex_);
size_t index = 0;
++total_size_;
auto fleet_ptr = FleetWrapper::GetInstance();
if (!full_) {
candidate_list_.emplace_back(record, slot_index_to_replace_);
candidate_list_.back().shadow_index_ = cur_size_;
++cur_size_;
full_ = (cur_size_ == capacity_);
} else {
index = fleet_ptr->LocalRandomEngine()() % total_size_;
if (index < capacity_) {
candidate_list_.emplace_back(record, slot_index_to_replace_);
candidate_list_[index].shadow_index_ = candidate_list_.size() - 1;
}
}
index = fleet_ptr->LocalRandomEngine()() % cur_size_;
index_result = candidate_list_[index].shadow_index_;
}
const SlotRecordCandidate& Get(size_t index) const {
PADDLE_ENFORCE_LT(
index,
candidate_list_.size(),
platform::errors::OutOfRange("Your index [%lu] exceeds the number of "
"elements in candidate_list[%lu].",
index,
candidate_list_.size()));
return candidate_list_[index];
}
void SetSlotIndexToReplace(
const std::unordered_set<uint16_t>& slot_index_to_replace) {
slot_index_to_replace_ = slot_index_to_replace;
}

private:
size_t capacity_ = 0;
std::mutex mutex_;
bool full_ = false;
size_t cur_size_ = 0;
size_t total_size_ = 0;
std::vector<SlotRecordCandidate> candidate_list_;
std::unordered_set<uint16_t> slot_index_to_replace_;
};


template <class AR>
paddle::framework::Archive<AR>& operator<<(paddle::framework::Archive<AR>& ar,
const FeatureFeasign& fk) {
Expand Down
209 changes: 209 additions & 0 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ template <typename T>
void DatasetImpl<T>::SetFeaEval(bool fea_eval, int record_candidate_size) {
slots_shuffle_fea_eval_ = fea_eval;
slots_shuffle_rclist_.ReSize(record_candidate_size);
slots_record_shuffle_rclist_.ReSize(record_candidate_size);
VLOG(3) << "SetFeaEval fea eval mode: " << fea_eval
<< " with record candidate size: " << record_candidate_size;
}
Expand Down Expand Up @@ -1640,6 +1641,214 @@ void MultiSlotDataset::SlotsShuffle(
<< ", cost time=" << timeline.ElapsedSec() << " seconds";
}

void SlotRecordDataset::GetRandomData(
const std::unordered_set<uint16_t>& slots_to_replace,
std::vector<SlotRecord>* result) {
int debug_erase_cnt = 0;
int debug_push_cnt = 0;
auto multi_slot_desc = data_feed_desc_.multi_slot_desc();
slots_record_shuffle_rclist_.ReInit();
const auto& slots_shuffle_original_data = GetSlotsOriginalData();
// VLOG(0) << "Begin to get_random data";
for (const auto& rec : slots_shuffle_original_data) {
SlotRecordCandidate rand_rec;
SlotRecord new_rec = rec;

slots_record_shuffle_rclist_.AddAndGet(rec, &rand_rec);
// VLOG(0) << "shuffle_done";
std::vector<uint64_t> slot_v;
// support multi slot shuffle one time;
for (auto it:slots_to_replace) {

int erase_begin =
new_rec->slot_uint64_feasigns_.slot_offsets.at(it);
int erase_end =
new_rec->slot_uint64_feasigns_.slot_offsets.at(it + 1);
auto slot_v_it = new_rec->slot_uint64_feasigns_.slot_values.begin()
+ erase_begin;
for (int idx = erase_begin;
slot_v_it != new_rec->slot_uint64_feasigns_.slot_values.end() &&
idx < erase_end; ++idx) {
slot_v_it =
new_rec->slot_uint64_feasigns_.slot_values.erase(slot_v_it);
debug_erase_cnt += 1;
}
auto range = rand_rec.feas_.equal_range(it);
slot_v.clear();
for (auto it_val = range.first; it_val != range.second; ++it_val) {
slot_v.push_back(it_val->second);
debug_push_cnt += 1;
}

new_rec->slot_uint64_feasigns_.slot_values.insert(
new_rec->slot_uint64_feasigns_.slot_values.begin() + erase_begin,
slot_v.begin(),
slot_v.end());
// update slot_offset afterwards
for (auto k = it + 1; k < static_cast<int>
(new_rec->slot_uint64_feasigns_.slot_offsets.size()); ++k) {
new_rec->slot_uint64_feasigns_.slot_offsets.at(k) =
new_rec->slot_uint64_feasigns_.slot_offsets.at(k)
- (erase_end - erase_begin) + slot_v.size();
}
}

result->push_back(std::move(new_rec));
}
VLOG(0) << "Result size:" << result->size();
VLOG(0) << "End to get_random data";
VLOG(2) << "erase feasign num: " << debug_erase_cnt
<< " repush feasign num: " << debug_push_cnt;
}

void SlotRecordDataset::PreprocessChannel(
const std::set<std::string>& slots_to_replace,
std::unordered_set<uint16_t>& index_slots) { // NOLINT
int out_channel_size = 0;
if (cur_channel_ == 0) {
for (size_t i = 0; i < multi_output_channel_.size(); ++i) {
out_channel_size += multi_output_channel_[i]->Size();
}
} else {
for (size_t i = 0; i < multi_consume_channel_.size(); ++i) {
out_channel_size += multi_consume_channel_[i]->Size();
}
}
VLOG(2) << "DatasetImpl<T>::SlotsShuffle() begin with input channel size: "
<< input_channel_->Size()
<< " output channel size: " << out_channel_size;

if ((!input_channel_ || input_channel_->Size() == 0) &&
slots_shuffle_original_data_.size() == 0 && out_channel_size == 0) {
VLOG(3) << "DatasetImpl<T>::SlotsShuffle() end, no data to slots shuffle";
return;
}

auto multi_slot_desc = data_feed_desc_.multi_slot_desc();
auto i_u = -1;
for (int i = 0; i < multi_slot_desc.slots_size(); ++i) {
std::string cur_slot_type = multi_slot_desc.slots(i).type();
std::string cur_slot = multi_slot_desc.slots(i).name();
if (cur_slot_type[0] == 'u') ++i_u;
if (cur_slot_type[0] == 'f' || (cur_slot[0] > 'a' && cur_slot[0] < 'z')) {
continue;
}
if (slots_to_replace.find(cur_slot) != slots_to_replace.end()) {
index_slots.insert(i_u);
}
}
CHECK(index_slots.size() > 0) << "the shuffle slot is out of dict!";
if (slots_shuffle_original_data_.size() == 0) {
// before first slots shuffle, instances could be in
// input_channel, oupput_channel or consume_channel
if (input_channel_ && input_channel_->Size() != 0) {
slots_shuffle_original_data_.reserve(input_channel_->Size());
input_channel_->Close();
input_channel_->ReadAll(slots_shuffle_original_data_);
} else {
CHECK(out_channel_size > 0); // NOLINT
if (cur_channel_ == 0) {
for (size_t i = 0; i < multi_output_channel_.size(); ++i) {
std::vector<SlotRecord> vec_data;
multi_output_channel_[i]->Close();
multi_output_channel_[i]->ReadAll(vec_data);
slots_shuffle_original_data_.reserve(
slots_shuffle_original_data_.size() + vec_data.size());
slots_shuffle_original_data_.insert(
slots_shuffle_original_data_.end(),
std::make_move_iterator(vec_data.begin()),
std::make_move_iterator(vec_data.end()));
vec_data.clear();
vec_data.shrink_to_fit();
multi_output_channel_[i]->Clear();
}
} else {
for (size_t i = 0; i < multi_consume_channel_.size(); ++i) {
std::vector<SlotRecord> vec_data;
multi_consume_channel_[i]->Close();
multi_consume_channel_[i]->ReadAll(vec_data);
slots_shuffle_original_data_.reserve(
slots_shuffle_original_data_.size() + vec_data.size());
slots_shuffle_original_data_.insert(
slots_shuffle_original_data_.end(),
std::make_move_iterator(vec_data.begin()),
std::make_move_iterator(vec_data.end()));
vec_data.clear();
vec_data.shrink_to_fit();
multi_consume_channel_[i]->Clear();
}
}
}
} else {
// if already have original data for slots shuffle, clear channel
input_channel_->Clear();
if (cur_channel_ == 0) {
for (size_t i = 0; i < multi_output_channel_.size(); ++i) {
if (!multi_output_channel_[i]) {
continue;
}
multi_output_channel_[i]->Clear();
}
} else {
for (size_t i = 0; i < multi_consume_channel_.size(); ++i) {
if (!multi_consume_channel_[i]) {
continue;
}
multi_consume_channel_[i]->Clear();
}
}
}
int end_size = 0;
if (cur_channel_ == 0) {
for (size_t i = 0; i < multi_output_channel_.size(); ++i) {
if (!multi_output_channel_[i]) {
continue;
}
end_size += multi_output_channel_[i]->Size();
}
} else {
for (size_t i = 0; i < multi_consume_channel_.size(); ++i) {
if (!multi_consume_channel_[i]) {
continue;
}
end_size += multi_consume_channel_[i]->Size();
}
}
CHECK(input_channel_->Size() == 0)
<< "input channel should be empty before slots shuffle";

}

// slots shuffle to input_channel_ with needed-shuffle slots
void SlotRecordDataset::SlotsShuffle(
const std::set<std::string>& slots_to_replace) {
PADDLE_ENFORCE_EQ(slots_shuffle_fea_eval_,
true,
platform::errors::PreconditionNotMet(
"fea eval mode off, need to set on for slots shuffle"));
platform::Timer timeline;
timeline.Start();
std::unordered_set<uint16_t> index_slots;
PreprocessChannel(slots_to_replace, index_slots);
// VLOG(0) << "Proprocess done";
std::vector<SlotRecord> random_data;
random_data.clear();
// get slots shuffled random_data
GetRandomData(index_slots, &random_data);
input_channel_->Open();
input_channel_->Write(std::move(random_data));
random_data.clear();
random_data.shrink_to_fit();
input_channel_->Close();
cur_channel_ = 0;

timeline.Pause();
VLOG(2) << "DatasetImpl<T>::SlotsShuffle() end"
<< ", memory data size for slots shuffle=" << input_channel_->Size()
<< ", cost time=" << timeline.ElapsedSec() << " seconds";
}


template class DatasetImpl<SlotRecord>;
void SlotRecordDataset::CreateChannel() {
if (input_channel_ == nullptr) {
Expand Down
Loading

0 comments on commit f023bee

Please sign in to comment.