Skip to content

Commit

Permalink
fix bagging
Browse files Browse the repository at this point in the history
  • Loading branch information
guolinke committed Feb 23, 2020
1 parent 790c1e3 commit 6fc4966
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 232 deletions.
2 changes: 1 addition & 1 deletion docs/Parameters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ IO Parameters

- ``data_random_seed`` :raw-html:`<a id="data_random_seed" title="Permalink to this parameter" href="#data_random_seed">&#x1F517;&#xFE0E;</a>`, default = ``1``, type = int, aliases: ``data_seed``

- random seed for data partition in parallel learning (excluding the ``feature_parallel`` mode)
- random seed for sampling data to construct histogram bins

- ``output_model`` :raw-html:`<a id="output_model" title="Permalink to this parameter" href="#output_model">&#x1F517;&#xFE0E;</a>`, default = ``LightGBM_model.txt``, type = string, aliases: ``model_output``, ``model_out``

Expand Down
2 changes: 1 addition & 1 deletion include/LightGBM/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ struct Config {
double histogram_pool_size = -1.0;

// alias = data_seed
// desc = random seed for data partition in parallel learning (excluding the ``feature_parallel`` mode)
// desc = random seed for sampling data to construct histogram bins
int data_random_seed = 1;

// alias = model_output, model_out
Expand Down
135 changes: 132 additions & 3 deletions include/LightGBM/utils/threading.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
/*!
* Copyright (c) 2016 Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE file in the project root for license information.
* Licensed under the MIT License. See LICENSE file in the project root for
* license information.
*/
#ifndef LIGHTGBM_UTILS_THREADING_H_
#define LIGHTGBM_UTILS_THREADING_H_

#include <LightGBM/meta.h>
#include <LightGBM/utils/common.h>
#include <LightGBM/utils/openmp_wrapper.h>

#include <functional>
Expand Down Expand Up @@ -37,6 +40,23 @@ class Threading {
*block_size = cnt;
}
}
template <typename INDEX_T>
static inline void BlockInfoForceSize(int num_threads, INDEX_T cnt,
INDEX_T min_cnt_per_block,
int* out_nblock, INDEX_T* block_size) {
*out_nblock = std::min<int>(
num_threads,
static_cast<int>((cnt + min_cnt_per_block - 1) / min_cnt_per_block));
if (*out_nblock > 1) {
*block_size = (cnt + (*out_nblock) - 1) / (*out_nblock);
// force the block size to the times of min_cnt_per_block
*block_size = (*block_size + min_cnt_per_block - 1) / min_cnt_per_block *
min_cnt_per_block;
} else {
*block_size = cnt;
}
}

template <typename INDEX_T>
static inline int For(
INDEX_T start, INDEX_T end, INDEX_T min_block_size,
Expand All @@ -58,6 +78,115 @@ class Threading {
}
};

} // namespace LightGBM
template <typename INDEX_T, bool TWO_BUFFER>
class ParallelPartitionRunner {
public:
ParallelPartitionRunner(INDEX_T num_data, INDEX_T min_block_size)
: min_block_size_(min_block_size) {
num_threads_ = 1;
#pragma omp parallel
#pragma omp master
{ num_threads_ = omp_get_num_threads(); }
left_.resize(num_data);
if (TWO_BUFFER) {
right_.resize(num_data);
}
offsets_.resize(num_threads_);
left_cnts_.resize(num_threads_);
right_cnts_.resize(num_threads_);
left_write_pos_.resize(num_threads_);
right_write_pos_.resize(num_threads_);
}
~ParallelPartitionRunner(){};

void ReSize(INDEX_T num_data) {
left_.resize(num_data);
if (TWO_BUFFER) {
right_.resize(num_data);
}
}

template<bool FORCE_SIZE>
INDEX_T Run(
INDEX_T cnt,
const std::function<INDEX_T(int, INDEX_T, INDEX_T, INDEX_T*, INDEX_T*)>& func,
INDEX_T* out) {
int nblock = 1;
INDEX_T inner_size = cnt;
if (FORCE_SIZE) {
Threading::BlockInfoForceSize<INDEX_T>(num_threads_, cnt, min_block_size_,
&nblock, &inner_size);
} else {
Threading::BlockInfo<INDEX_T>(num_threads_, cnt, min_block_size_, &nblock,
&inner_size);
}

OMP_INIT_EX();
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < nblock; ++i) {
OMP_LOOP_EX_BEGIN();
INDEX_T cur_start = i * inner_size;
INDEX_T cur_cnt = std::min(inner_size, cnt - cur_start);
offsets_[i] = cur_start;
if (cur_cnt <= 0) {
left_cnts_[i] = 0;
right_cnts_[i] = 0;
continue;
}
auto left_ptr = left_.data() + cur_start;
INDEX_T* right_ptr = nullptr;
if (TWO_BUFFER) {
right_ptr = right_.data() + cur_start;
}
// split data inner, reduce the times of function called
INDEX_T cur_left_count =
func(i, cur_start, cur_cnt, left_ptr, right_ptr);
if (!TWO_BUFFER) {
// reverse for one buffer
std::reverse(left_ptr + cur_left_count, left_ptr + cur_cnt);
}
left_cnts_[i] = cur_left_count;
right_cnts_[i] = cur_cnt - cur_left_count;
OMP_LOOP_EX_END();
}
OMP_THROW_EX();

left_write_pos_[0] = 0;
right_write_pos_[0] = 0;
for (int i = 1; i < nblock; ++i) {
left_write_pos_[i] = left_write_pos_[i - 1] + left_cnts_[i - 1];
right_write_pos_[i] = right_write_pos_[i - 1] + right_cnts_[i - 1];
}
data_size_t left_cnt = left_write_pos_[nblock - 1] + left_cnts_[nblock - 1];

auto right_start = out + left_cnt;
#pragma omp parallel for schedule(static)
for (int i = 0; i < nblock; ++i) {
std::copy_n(left_.data() + offsets_[i], left_cnts_[i],
out + left_write_pos_[i]);
if (TWO_BUFFER) {
std::copy_n(right_.data() + offsets_[i], right_cnts_[i],
right_start + right_write_pos_[i]);
} else {
std::copy_n(left_.data() + offsets_[i] + left_cnts_[i], right_cnts_[i],
right_start + right_write_pos_[i]);
}
}
return left_cnt;
}

private:
int num_threads_;
INDEX_T min_block_size_;
std::vector<INDEX_T> left_;
std::vector<INDEX_T> right_;
std::vector<INDEX_T> offsets_;
std::vector<INDEX_T> left_cnts_;
std::vector<INDEX_T> right_cnts_;
std::vector<INDEX_T> left_write_pos_;
std::vector<INDEX_T> right_write_pos_;
};

} // namespace LightGBM

#endif // LightGBM_UTILS_THREADING_H_
#endif // LightGBM_UTILS_THREADING_H_
144 changes: 52 additions & 92 deletions src/boosting/gbdt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@

namespace LightGBM {

GBDT::GBDT() : iter_(0),
train_data_(nullptr),
objective_function_(nullptr),
early_stopping_round_(0),
es_first_metric_only_(false),
max_feature_idx_(0),
num_tree_per_iteration_(1),
num_class_(1),
num_iteration_for_pred_(0),
shrinkage_rate_(0.1f),
num_init_iteration_(0),
need_re_bagging_(false),
balanced_bagging_(false) {
#pragma omp parallel
#pragma omp master
{
num_threads_ = omp_get_num_threads();
}
GBDT::GBDT()
: iter_(0),
train_data_(nullptr),
objective_function_(nullptr),
early_stopping_round_(0),
es_first_metric_only_(false),
max_feature_idx_(0),
num_tree_per_iteration_(1),
num_class_(1),
num_iteration_for_pred_(0),
shrinkage_rate_(0.1f),
num_init_iteration_(0),
need_re_bagging_(false),
balanced_bagging_(false),
bagging_runner_(0, bagging_rand_block_) {
average_output_ = false;
tree_learner_ = nullptr;
}
Expand Down Expand Up @@ -164,53 +161,50 @@ void GBDT::Boosting() {
GetGradients(GetTrainingScore(&num_score), gradients_.data(), hessians_.data());
}

data_size_t GBDT::BaggingHelper(Random* cur_rand, data_size_t start, data_size_t cnt, data_size_t* buffer) {
data_size_t GBDT::BaggingHelper(data_size_t start, data_size_t cnt, data_size_t* buffer) {
if (cnt <= 0) {
return 0;
}
data_size_t bag_data_cnt = static_cast<data_size_t>(config_->bagging_fraction * cnt);
data_size_t cur_left_cnt = 0;
data_size_t cur_right_cnt = 0;
auto right_buffer = buffer + bag_data_cnt;
data_size_t cur_right_pos = cnt;
// random bagging, minimal unit is one record
for (data_size_t i = 0; i < cnt; ++i) {
float prob = (bag_data_cnt - cur_left_cnt) / static_cast<float>(cnt - i);
if (cur_rand->NextFloat() < prob) {
buffer[cur_left_cnt++] = start + i;
auto cur_idx = start + i;
if (bagging_rands_[cur_idx / bagging_rand_block_].NextFloat() < config_->bagging_fraction) {
buffer[cur_left_cnt++] = cur_idx;
} else {
right_buffer[cur_right_cnt++] = start + i;
buffer[--cur_right_pos] = cur_idx;
}
}
CHECK(cur_left_cnt == bag_data_cnt);
return cur_left_cnt;
}

data_size_t GBDT::BalancedBaggingHelper(Random* cur_rand, data_size_t start, data_size_t cnt, data_size_t* buffer) {
data_size_t GBDT::BalancedBaggingHelper(data_size_t start, data_size_t cnt,
data_size_t* buffer) {
if (cnt <= 0) {
return 0;
}
auto label_ptr = train_data_->metadata().label();
data_size_t cur_left_cnt = 0;
data_size_t cur_right_pos = cnt - 1;
// from right to left
auto right_buffer = buffer;
data_size_t cur_right_pos = cnt;
// random bagging, minimal unit is one record
for (data_size_t i = 0; i < cnt; ++i) {
auto cur_idx = start + i;
bool is_pos = label_ptr[start + i] > 0;
bool is_in_bag = false;
if (is_pos) {
is_in_bag = cur_rand->NextFloat() < config_->pos_bagging_fraction;
is_in_bag = bagging_rands_[cur_idx / bagging_rand_block_].NextFloat() <
config_->pos_bagging_fraction;
} else {
is_in_bag = cur_rand->NextFloat() < config_->neg_bagging_fraction;
is_in_bag = bagging_rands_[cur_idx / bagging_rand_block_].NextFloat() <
config_->neg_bagging_fraction;
}
if (is_in_bag) {
buffer[cur_left_cnt++] = start + i;
buffer[cur_left_cnt++] = cur_idx;
} else {
right_buffer[cur_right_pos--] = start + i;
buffer[--cur_right_pos] = cur_idx;
}
}
// reverse right buffer
std::reverse(buffer + cur_left_cnt, buffer + cnt);
return cur_left_cnt;
}

Expand All @@ -220,54 +214,20 @@ void GBDT::Bagging(int iter) {
if ((bag_data_cnt_ < num_data_ && iter % config_->bagging_freq == 0) ||
need_re_bagging_) {
need_re_bagging_ = false;
int n_block = Threading::For<data_size_t>(
0, num_data_, 1024,
[this, iter](int i, data_size_t cur_start, data_size_t cur_end) {
data_size_t cur_cnt = cur_end - cur_start;
if (cur_cnt <= 0) {
left_cnts_buf_[i] = 0;
right_cnts_buf_[i] = 0;
auto left_cnt = bagging_runner_.Run<true>(
num_data_,
[=](int, data_size_t cur_start, data_size_t cur_cnt, data_size_t* left,
data_size_t*) {
data_size_t cur_left_count = 0;
if (balanced_bagging_) {
cur_left_count =
BalancedBaggingHelper(cur_start, cur_cnt, left);
} else {
Random cur_rand(config_->bagging_seed + iter * num_threads_ + i);
data_size_t cur_left_count = 0;
if (balanced_bagging_) {
cur_left_count =
BalancedBaggingHelper(&cur_rand, cur_start, cur_cnt,
tmp_indices_.data() + cur_start);
} else {
cur_left_count = BaggingHelper(&cur_rand, cur_start, cur_cnt,
tmp_indices_.data() + cur_start);
}
offsets_buf_[i] = cur_start;
left_cnts_buf_[i] = cur_left_count;
right_cnts_buf_[i] = cur_cnt - cur_left_count;
cur_left_count = BaggingHelper(cur_start, cur_cnt, left);
}
});
data_size_t left_cnt = 0;
left_write_pos_buf_[0] = 0;
right_write_pos_buf_[0] = 0;
for (int i = 1; i < n_block; ++i) {
left_write_pos_buf_[i] =
left_write_pos_buf_[i - 1] + left_cnts_buf_[i - 1];
right_write_pos_buf_[i] =
right_write_pos_buf_[i - 1] + right_cnts_buf_[i - 1];
}
left_cnt = left_write_pos_buf_[n_block - 1] + left_cnts_buf_[n_block - 1];

#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < n_block; ++i) {
if (left_cnts_buf_[i] > 0) {
std::memcpy(bag_data_indices_.data() + left_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i],
left_cnts_buf_[i] * sizeof(data_size_t));
}
if (right_cnts_buf_[i] > 0) {
std::memcpy(
bag_data_indices_.data() + left_cnt + right_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i] + left_cnts_buf_[i],
right_cnts_buf_[i] * sizeof(data_size_t));
}
}
return cur_left_count;
},
bag_data_indices_.data());
bag_data_cnt_ = left_cnt;
Log::Debug("Re-bagging, using %d data to train", bag_data_cnt_);
// set bagging data to tree learner
Expand Down Expand Up @@ -780,15 +740,15 @@ void GBDT::ResetBaggingConfig(const Config* config, bool is_change_dataset) {
bag_data_cnt_ = static_cast<data_size_t>(config->bagging_fraction * num_data_);
}
bag_data_indices_.resize(num_data_);
tmp_indices_.resize(num_data_);

offsets_buf_.resize(num_threads_);
left_cnts_buf_.resize(num_threads_);
right_cnts_buf_.resize(num_threads_);
left_write_pos_buf_.resize(num_threads_);
right_write_pos_buf_.resize(num_threads_);
bagging_runner_.ReSize(num_data_);
bagging_rands_.clear();
for (int i = 0;
i < (num_data_ + bagging_rand_block_ - 1) / bagging_rand_block_; ++i) {
bagging_rands_.emplace_back(config_->bagging_seed + i);
}

double average_bag_rate = (bag_data_cnt_ / num_data_) / config->bagging_freq;
double average_bag_rate =
(static_cast<double>(bag_data_cnt_) / num_data_) / config->bagging_freq;
is_use_subset_ = false;
const int group_threshold_usesubset = 100;
if (tree_learner_->IsHistColWise() && average_bag_rate <= 0.5
Expand All @@ -813,7 +773,7 @@ void GBDT::ResetBaggingConfig(const Config* config, bool is_change_dataset) {
} else {
bag_data_cnt_ = num_data_;
bag_data_indices_.clear();
tmp_indices_.clear();
bagging_runner_.ReSize(0);
is_use_subset_ = false;
}
}
Expand Down
Loading

0 comments on commit 6fc4966

Please sign in to comment.