Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PushCSC for SparsePage. #4193

Merged
merged 3 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 7 additions & 32 deletions include/xgboost/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,42 +250,17 @@ class SparsePage {
* \brief Push row block into the page.
* \param batch the row batch.
*/
inline void Push(const dmlc::RowBlock<uint32_t>& batch) {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
data_vec.reserve(data.Size() + batch.offset[batch.size] - batch.offset[0]);
offset_vec.reserve(offset.Size() + batch.size);
CHECK(batch.index != nullptr);
for (size_t i = 0; i < batch.size; ++i) {
offset_vec.push_back(offset_vec.back() + batch.offset[i + 1] - batch.offset[i]);
}
for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) {
uint32_t index = batch.index[i];
bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i];
data_vec.emplace_back(index, fvalue);
}
CHECK_EQ(offset_vec.back(), data.Size());
}
void Push(const dmlc::RowBlock<uint32_t>& batch);
/*!
* \brief Push a sparse page
* \param batch the row page
*/
inline void Push(const SparsePage &batch) {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
const auto& batch_offset_vec = batch.offset.HostVector();
const auto& batch_data_vec = batch.data.HostVector();
size_t top = offset_vec.back();
data_vec.resize(top + batch.data.Size());
std::memcpy(dmlc::BeginPtr(data_vec) + top,
dmlc::BeginPtr(batch_data_vec),
sizeof(Entry) * batch.data.Size());
size_t begin = offset.Size();
offset_vec.resize(begin + batch.Size());
for (size_t i = 0; i < batch.Size(); ++i) {
offset_vec[i + begin] = top + batch_offset_vec[i + 1];
}
}
void Push(const SparsePage &batch);
/*!
* \brief Push a SparsePage stored in CSC format
* \param batch The row batch to be pushed
*/
void PushCSC(const SparsePage& batch);
/*!
* \brief Push one instance into page
* \param inst an instance row
Expand Down
26 changes: 13 additions & 13 deletions src/common/group_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace common {
* \tparam ValueType type of entries in the sparse matrix
* \tparam SizeType type of the index range holder
*/
template<typename ValueType, typename SizeType = size_t>
template<typename ValueType, typename SizeType = std::size_t>
struct ParallelGroupBuilder {
public:
// parallel group builder of data
Expand All @@ -44,9 +44,9 @@ struct ParallelGroupBuilder {
* \param nkeys number of keys in the matrix, can be smaller than expected
* \param nthread number of thread that will be used in construction
*/
inline void InitBudget(size_t nkeys, int nthread) {
inline void InitBudget(std::size_t nkeys, int nthread) {
thread_rptr_.resize(nthread);
for (size_t i = 0; i < thread_rptr_.size(); ++i) {
for (std::size_t i = 0; i < thread_rptr_.size(); ++i) {
thread_rptr_[i].resize(nkeys);
std::fill(thread_rptr_[i].begin(), thread_rptr_[i].end(), 0);
}
Expand All @@ -57,7 +57,7 @@ struct ParallelGroupBuilder {
* \param threadid the id of thread that calls this function
* \param nelem number of element budget add to this row
*/
inline void AddBudget(size_t key, int threadid, SizeType nelem = 1) {
inline void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) {
std::vector<SizeType> &trptr = thread_rptr_[threadid];
if (trptr.size() < key + 1) {
trptr.resize(key + 1, 0);
Expand All @@ -67,23 +67,23 @@ struct ParallelGroupBuilder {
/*! \brief step 3: initialize the necessary storage */
inline void InitStorage() {
// set rptr to correct size
for (size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
if (rptr_.size() <= thread_rptr_[tid].size()) {
rptr_.resize(thread_rptr_[tid].size() + 1);
rptr_.resize(thread_rptr_[tid].size() + 1); // key + 1
}
}
// initialize rptr to be beginning of each segment
size_t start = 0;
for (size_t i = 0; i + 1 < rptr_.size(); ++i) {
for (size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
std::size_t start = 0;
for (std::size_t i = 0; i + 1 < rptr_.size(); ++i) {
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
std::vector<SizeType> &trptr = thread_rptr_[tid];
if (i < trptr.size()) {
size_t ncnt = trptr[i];
if (i < trptr.size()) { // i^th row is assigned for this thread
std::size_t ncnt = trptr[i]; // how many entries in this row
trptr[i] = start;
start += ncnt;
}
}
rptr_[i + 1] = start;
rptr_[i + 1] = start; // pointer accumulated from all thread
}
data_.resize(start);
}
Expand All @@ -95,7 +95,7 @@ struct ParallelGroupBuilder {
* \param value The value to be pushed to the group.
* \param threadid the id of thread that calls this function
*/
inline void Push(size_t key, ValueType value, int threadid) {
void Push(std::size_t key, ValueType value, int threadid) {
SizeType &rp = thread_rptr_[threadid][key];
data_[rp++] = value;
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ struct Monitor {

LOG(CONSOLE) << "======== Monitor: " << label << " ========";
for (auto &kv : statistics_map) {
if (kv.second.count == 0) {
LOG(WARNING) <<
"Timer for " << kv.first << " did not get stopped properly.";
continue;
}
LOG(CONSOLE) << kv.first << ": " << kv.second.timer.ElapsedSeconds()
<< "s, " << kv.second.count << " calls @ "
<< std::chrono::duration_cast<std::chrono::microseconds>(
Expand Down
89 changes: 89 additions & 0 deletions src/data/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,95 @@ data::SparsePageFormat::DecideFormat(const std::string& cache_prefix) {
}
}

void SparsePage::Push(const SparsePage &batch) {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
const auto& batch_offset_vec = batch.offset.HostVector();
const auto& batch_data_vec = batch.data.HostVector();
size_t top = offset_vec.back();
data_vec.resize(top + batch.data.Size());
std::memcpy(dmlc::BeginPtr(data_vec) + top,
dmlc::BeginPtr(batch_data_vec),
sizeof(Entry) * batch.data.Size());
size_t begin = offset.Size();
offset_vec.resize(begin + batch.Size());
for (size_t i = 0; i < batch.Size(); ++i) {
offset_vec[i + begin] = top + batch_offset_vec[i + 1];
}
}

void SparsePage::Push(const dmlc::RowBlock<uint32_t>& batch) {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
data_vec.reserve(data.Size() + batch.offset[batch.size] - batch.offset[0]);
offset_vec.reserve(offset.Size() + batch.size);
CHECK(batch.index != nullptr);
for (size_t i = 0; i < batch.size; ++i) {
offset_vec.push_back(offset_vec.back() + batch.offset[i + 1] - batch.offset[i]);
}
for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) {
uint32_t index = batch.index[i];
bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i];
data_vec.emplace_back(index, fvalue);
}
CHECK_EQ(offset_vec.back(), data.Size());
}

void SparsePage::PushCSC(const SparsePage &batch) {
std::vector<xgboost::Entry>& self_data = data.HostVector();
std::vector<size_t>& self_offset = offset.HostVector();

auto const& other_data = batch.data.ConstHostVector();
auto const& other_offset = batch.offset.ConstHostVector();

if (other_data.empty()) {
return;
}
if (!self_data.empty()) {
CHECK_EQ(self_offset.size(), other_offset.size())
<< "self_data.size(): " << this->data.Size() << ", "
<< "other_data.size(): " << other_data.size() << std::flush;
} else {
self_data = other_data;
self_offset = other_offset;
return;
}

std::vector<size_t> offset(other_offset.size());
offset[0] = 0;

std::vector<xgboost::Entry> data(self_data.size() + batch.data.Size());

// n_cols in original csr data matrix, here in csc is n_rows
size_t const n_features = other_offset.size() - 1;
size_t beg = 0;
size_t ptr = 1;
for (size_t i = 0; i < n_features; ++i) {
size_t const self_beg = self_offset.at(i);
size_t const self_length = self_offset.at(i+1) - self_beg;
CHECK_LT(beg, data.size());
std::memcpy(dmlc::BeginPtr(data)+beg,
dmlc::BeginPtr(self_data) + self_beg,
sizeof(Entry) * self_length);
beg += self_length;

size_t const other_beg = other_offset.at(i);
size_t const other_length = other_offset.at(i+1) - other_beg;
CHECK_LT(beg, data.size());
std::memcpy(dmlc::BeginPtr(data)+beg,
dmlc::BeginPtr(other_data) + other_beg,
sizeof(Entry) * other_length);
beg += other_length;

CHECK_LT(ptr, offset.size());
offset.at(ptr) = beg;
ptr++;
}

self_data = std::move(data);
self_offset = std::move(offset);
}

namespace data {
// List of files that will be force linked in static links.
DMLC_REGISTRY_LINK_TAG(sparse_page_raw_format);
Expand Down
11 changes: 6 additions & 5 deletions src/data/sparse_page_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ bool SparsePageSource::CacheExist(const std::string& cache_info,
}

void SparsePageSource::CreateRowPage(dmlc::Parser<uint32_t>* src,
const std::string& cache_info) {
const std::string& cache_info) {
const std::string page_type = ".row.page";
std::vector<std::string> cache_shards = GetCacheShards(cache_info);
CHECK_NE(cache_shards.size(), 0U);
Expand Down Expand Up @@ -216,7 +216,8 @@ void SparsePageSource::CreateRowPage(dmlc::Parser<uint32_t>* src,
CHECK(info.qids_.empty() || info.qids_.size() == info.num_row_);
info.SaveBinary(fo.get());
}
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info;
LOG(CONSOLE) << "SparsePageSource::CreateRowPage Finished writing to "
<< name_info;
}

void SparsePageSource::CreatePageFromDMatrix(DMatrix* src,
Expand Down Expand Up @@ -246,9 +247,9 @@ void SparsePageSource::CreatePageFromDMatrix(DMatrix* src,
} else if (page_type == ".col.page") {
page->Push(batch.GetTranspose(src->Info().num_col_));
} else if (page_type == ".sorted.col.page") {
auto tmp = batch.GetTranspose(src->Info().num_col_);
tmp.SortRows();
page->Push(tmp);
SparsePage tmp = batch.GetTranspose(src->Info().num_col_);
page->PushCSC(tmp);
page->SortRows();
} else {
LOG(FATAL) << "Unknown page type: " << page_type;
}
Expand Down
1 change: 1 addition & 0 deletions src/tree/updater_basemaker-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class BaseMaker: public TreeUpdater {
for (bst_uint fid = 0; fid < batch.Size(); ++fid) {
auto c = batch[fid];
if (c.size() != 0) {
CHECK_LT(fid * 2, fminmax_.size());
fminmax_[fid * 2 + 0] =
std::max(-c[0].fvalue, fminmax_[fid * 2 + 0]);
fminmax_[fid * 2 + 1] =
Expand Down
55 changes: 55 additions & 0 deletions tests/cpp/data/test_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include <gtest/gtest.h>
#include <vector>

#include "xgboost/data.h"

namespace xgboost {
TEST(SparsePage, PushCSC) {
std::vector<size_t> offset {0};
std::vector<Entry> data;
SparsePage page;
page.offset.HostVector() = offset;
page.data.HostVector() = data;

offset = {0, 1, 4};
for (size_t i = 0; i < offset.back(); ++i) {
data.push_back(Entry(i, 0.1f));
}

SparsePage other;
other.offset.HostVector() = offset;
other.data.HostVector() = data;

page.PushCSC(other);

ASSERT_EQ(page.offset.HostVector().size(), offset.size());
ASSERT_EQ(page.data.HostVector().size(), data.size());
for (size_t i = 0; i < offset.size(); ++i) {
ASSERT_EQ(page.offset.HostVector()[i], offset[i]);
}
for (size_t i = 0; i < data.size(); ++i) {
ASSERT_EQ(page.data.HostVector()[i].index, data[i].index);
}

page.PushCSC(other);
ASSERT_EQ(page.offset.HostVector().size(), offset.size());
ASSERT_EQ(page.data.Size(), data.size() * 2);

for (size_t i = 0; i < offset.size(); ++i) {
ASSERT_EQ(page.offset.HostVector()[i], offset[i] * 2);
}

auto inst = page[0];
ASSERT_EQ(inst.size(), 2);
for (auto entry : inst) {
ASSERT_EQ(entry.index, 0);
}

inst = page[1];
ASSERT_EQ(inst.size(), 6);
std::vector<size_t> indices_sol {1, 2, 3};
for (size_t i = 0; i < inst.size(); ++i) {
ASSERT_EQ(inst[i].index, indices_sol[i % 3]);
}
}
}
23 changes: 23 additions & 0 deletions tests/cpp/test_learner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <vector>
#include "helpers.h"
#include "xgboost/learner.h"
#include "dmlc/filesystem.h"

namespace xgboost {

Expand Down Expand Up @@ -92,4 +93,26 @@ TEST(Learner, CheckGroup) {
delete pp_mat;
}

TEST(Learner, SLOW_CheckMultiBatch) {
using Arg = std::pair<std::string, std::string>;
// Create sufficiently large data to make two row pages
dmlc::TemporaryDirectory tempdir;
const std::string tmp_file = tempdir.path + "/big.libsvm";
CreateBigTestData(tmp_file, 5000000);
std::shared_ptr<DMatrix> dmat(xgboost::DMatrix::Load( tmp_file + "#" + tmp_file + ".cache", true, false));
EXPECT_TRUE(FileExists(tmp_file + ".cache.row.page"));
EXPECT_FALSE(dmat->SingleColBlock());
size_t num_row = dmat->Info().num_row_;
std::vector<bst_float> labels(num_row);
for (size_t i = 0; i < num_row; ++i) {
labels[i] = i % 2;
}
dmat->Info().SetInfo("label", labels.data(), DataType::kFloat32, num_row);
std::vector<std::shared_ptr<DMatrix>> mat{dmat};
auto learner = std::unique_ptr<Learner>(Learner::Create(mat));
learner->Configure({Arg{"objective", "binary:logistic"}});
learner->InitModel();
learner->UpdateOneIter(0, dmat.get());
}

} // namespace xgboost