Skip to content

Commit

Permalink
Support vertical federated learning (#8932)
Browse files Browse the repository at this point in the history
  • Loading branch information
rongou authored Mar 22, 2023
1 parent 8dc1e4b commit b240f05
Show file tree
Hide file tree
Showing 23 changed files with 371 additions and 249 deletions.
26 changes: 20 additions & 6 deletions include/xgboost/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ class MetaInfo {
*/
void Extend(MetaInfo const& that, bool accumulate_rows, bool check_column);

/**
* @brief Synchronize the number of columns across all workers.
*
* Normally we just need to find the maximum number of columns across all workers, but
* in vertical federated learning, since each worker loads its own list of columns,
* we need to sum them.
*/
void SynchronizeNumberOfColumns();

private:
void SetInfoFromHost(Context const& ctx, StringView key, Json arr);
void SetInfoFromCUDA(Context const& ctx, StringView key, Json arr);
Expand Down Expand Up @@ -325,6 +334,10 @@ class SparsePage {
* \brief Check wether the column index is sorted.
*/
bool IsIndicesSorted(int32_t n_threads) const;
/**
* \brief Reindex the column index with an offset.
*/
void Reindex(uint64_t feature_offset, int32_t n_threads);

void SortRows(int32_t n_threads);

Expand Down Expand Up @@ -559,17 +572,18 @@ class DMatrix {
* \brief Creates a new DMatrix from an external data adapter.
*
* \tparam AdapterT Type of the adapter.
* \param [in,out] adapter View onto an external data.
* \param missing Values to count as missing.
* \param nthread Number of threads for construction.
* \param cache_prefix (Optional) The cache prefix for external memory.
* \param page_size (Optional) Size of the page.
* \param [in,out] adapter View onto an external data.
* \param missing Values to count as missing.
* \param nthread Number of threads for construction.
* \param cache_prefix (Optional) The cache prefix for external memory.
* \param data_split_mode (Optional) Data split mode.
*
* \return a Created DMatrix.
*/
template <typename AdapterT>
static DMatrix* Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix = "");
const std::string& cache_prefix = "",
DataSplitMode data_split_mode = DataSplitMode::kRow);

/**
* \brief Create a new Quantile based DMatrix used for histogram based algorithm.
Expand Down
57 changes: 38 additions & 19 deletions src/data/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,14 @@ void MetaInfo::Extend(MetaInfo const& that, bool accumulate_rows, bool check_col
}
}

void MetaInfo::SynchronizeNumberOfColumns() {
if (collective::IsFederated() && data_split_mode == DataSplitMode::kCol) {
collective::Allreduce<collective::Operation::kSum>(&num_col_, 1);
} else {
collective::Allreduce<collective::Operation::kMax>(&num_col_, 1);
}
}

void MetaInfo::Validate(std::int32_t device) const {
if (group_ptr_.size() != 0 && weights_.Size() != 0) {
CHECK_EQ(group_ptr_.size(), weights_.Size() + 1)
Expand Down Expand Up @@ -870,7 +878,7 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
dmlc::Parser<uint32_t>::Create(fname.c_str(), partid, npart, file_format.c_str()));
data::FileAdapter adapter(parser.get());
dmat = DMatrix::Create(&adapter, std::numeric_limits<float>::quiet_NaN(), Context{}.Threads(),
cache_file);
cache_file, data_split_mode);
} else {
data::FileIterator iter{fname, static_cast<uint32_t>(partid), static_cast<uint32_t>(npart),
file_format};
Expand Down Expand Up @@ -906,11 +914,6 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
LOG(FATAL) << "Encountered parser error:\n" << e.what();
}

/* sync up number of features after matrix loaded.
* partitioned data will fail the train/val validation check
* since partitioned data not knowing the real number of features. */
collective::Allreduce<collective::Operation::kMax>(&dmat->Info().num_col_, 1);

if (need_split && data_split_mode == DataSplitMode::kCol) {
if (!cache_file.empty()) {
LOG(FATAL) << "Column-wise data split is not support for external memory.";
Expand All @@ -920,7 +923,6 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
delete dmat;
return sliced;
} else {
dmat->Info().data_split_mode = data_split_mode;
return dmat;
}
}
Expand Down Expand Up @@ -957,39 +959,49 @@ template DMatrix *DMatrix::Create<DataIterHandle, DMatrixHandle,
XGDMatrixCallbackNext *next, float missing, int32_t n_threads, std::string);

template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread, const std::string&) {
return new data::SimpleDMatrix(adapter, missing, nthread);
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread, const std::string&,
DataSplitMode data_split_mode) {
return new data::SimpleDMatrix(adapter, missing, nthread, data_split_mode);
}

template DMatrix* DMatrix::Create<data::DenseAdapter>(data::DenseAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::ArrayAdapter>(data::ArrayAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSRAdapter>(data::CSRAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSCAdapter>(data::CSCAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::DataTableAdapter>(data::DataTableAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::FileAdapter>(data::FileAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSRArrayAdapter>(data::CSRArrayAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSCArrayAdapter>(data::CSCArrayAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create(
data::IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>* adapter,
float missing, int nthread, const std::string& cache_prefix);
float missing, int nthread, const std::string& cache_prefix, DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::RecordBatchesIterAdapter>(
data::RecordBatchesIterAdapter* adapter, float missing, int nthread, const std::string&);
data::RecordBatchesIterAdapter* adapter, float missing, int nthread, const std::string&,
DataSplitMode data_split_mode);

SparsePage SparsePage::GetTranspose(int num_columns, int32_t n_threads) const {
SparsePage transpose;
Expand Down Expand Up @@ -1051,6 +1063,13 @@ void SparsePage::SortIndices(int32_t n_threads) {
});
}

void SparsePage::Reindex(uint64_t feature_offset, int32_t n_threads) {
auto& h_data = this->data.HostVector();
common::ParallelFor(h_data.size(), n_threads, [&](auto i) {
h_data[i].index += feature_offset;
});
}

void SparsePage::SortRows(int32_t n_threads) {
auto& h_offset = this->offset.HostVector();
auto& h_data = this->data.HostVector();
Expand Down
8 changes: 4 additions & 4 deletions src/data/data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,17 @@ void MetaInfo::SetInfoFromCUDA(Context const& ctx, StringView key, Json array) {

template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix) {
const std::string& cache_prefix, DataSplitMode data_split_mode) {
CHECK_EQ(cache_prefix.size(), 0)
<< "Device memory construction is not currently supported with external "
"memory.";
return new data::SimpleDMatrix(adapter, missing, nthread);
return new data::SimpleDMatrix(adapter, missing, nthread, data_split_mode);
}

template DMatrix* DMatrix::Create<data::CudfAdapter>(
data::CudfAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix);
const std::string& cache_prefix, DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CupyAdapter>(
data::CupyAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix);
const std::string& cache_prefix, DataSplitMode data_split_mode);
} // namespace xgboost
2 changes: 1 addition & 1 deletion src/data/iterative_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
// From here on Info() has the correct data shape
Info().num_row_ = accumulated_rows;
Info().num_nonzero_ = nnz;
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
Info().SynchronizeNumberOfColumns();
CHECK(std::none_of(column_sizes.cbegin(), column_sizes.cend(), [&](auto f) {
return f > accumulated_rows;
})) << "Something went wrong during iteration.";
Expand Down
2 changes: 1 addition & 1 deletion src/data/iterative_dmatrix.cu
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing,

iter.Reset();
// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.SynchronizeNumberOfColumns();
}

BatchSet<EllpackPage> IterativeDMatrix::GetEllpackBatches(BatchParam const& param) {
Expand Down
56 changes: 42 additions & 14 deletions src/data/simple_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ DMatrix* SimpleDMatrix::SliceCol(int num_slices, int slice_id) {
return out;
}

void SimpleDMatrix::ReindexFeatures() {
if (collective::IsFederated() && info_.data_split_mode == DataSplitMode::kCol) {
std::vector<uint64_t> buffer(collective::GetWorldSize());
buffer[collective::GetRank()] = info_.num_col_;
collective::Allgather(buffer.data(), buffer.size() * sizeof(uint64_t));
auto offset = std::accumulate(buffer.cbegin(), buffer.cbegin() + collective::GetRank(), 0);
if (offset == 0) {
return;
}
sparse_page_->Reindex(offset, ctx_.Threads());
}
}

BatchSet<SparsePage> SimpleDMatrix::GetRowBatches() {
// since csr is the default data structure so `source_` is always available.
auto begin_iter = BatchIterator<SparsePage>(
Expand Down Expand Up @@ -151,7 +164,8 @@ BatchSet<ExtSparsePage> SimpleDMatrix::GetExtBatches(BatchParam const&) {
}

template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
DataSplitMode data_split_mode) {
this->ctx_.nthread = nthread;

std::vector<uint64_t> qids;
Expand Down Expand Up @@ -217,7 +231,9 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {


// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
ReindexFeatures();
info_.SynchronizeNumberOfColumns();

if (adapter->NumRows() == kAdapterUnknownSize) {
using IteratorAdapterT
Expand Down Expand Up @@ -272,22 +288,31 @@ void SimpleDMatrix::SaveToLocalFile(const std::string& fname) {
fo->Write(sparse_page_->data.HostVector());
}

template SimpleDMatrix::SimpleDMatrix(DenseAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(ArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSRAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSRArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSCArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSCAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(DataTableAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(FileAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(DenseAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(ArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSRAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSRArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSCArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSCAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(DataTableAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(FileAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(
IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>
*adapter,
float missing, int nthread);
float missing, int nthread, DataSplitMode data_split_mode);

template <>
SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, int nthread) {
ctx_.nthread = nthread;
SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode) {
ctx_.nthread = nthread;

auto& offset_vec = sparse_page_->offset.HostVector();
auto& data_vec = sparse_page_->data.HostVector();
Expand Down Expand Up @@ -346,7 +371,10 @@ SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, i
}
// Synchronise worker columns
info_.num_col_ = adapter->NumColumns();
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
ReindexFeatures();
info_.SynchronizeNumberOfColumns();

info_.num_row_ = total_batch_size;
info_.num_nonzero_ = data_vec.size();
CHECK_EQ(offset_vec.back(), info_.num_nonzero_);
Expand Down
12 changes: 8 additions & 4 deletions src/data/simple_dmatrix.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ namespace data {
// Current implementation assumes a single batch. More batches can
// be supported in future. Does not currently support inferring row/column size
template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread*/) {
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread*/,
DataSplitMode data_split_mode) {
CHECK(data_split_mode != DataSplitMode::kCol)
<< "Column-wise data split is currently not supported on the GPU.";
auto device = (adapter->DeviceIdx() < 0 || adapter->NumRows() == 0) ? dh::CurrentDevice()
: adapter->DeviceIdx();
CHECK_GE(device, 0);
Expand All @@ -35,12 +38,13 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread
info_.num_col_ = adapter->NumColumns();
info_.num_row_ = adapter->NumRows();
// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
info_.SynchronizeNumberOfColumns();
}

template SimpleDMatrix::SimpleDMatrix(CudfAdapter* adapter, float missing,
int nthread);
int nthread, DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CupyAdapter* adapter, float missing,
int nthread);
int nthread, DataSplitMode data_split_mode);
} // namespace data
} // namespace xgboost
12 changes: 11 additions & 1 deletion src/data/simple_dmatrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class SimpleDMatrix : public DMatrix {
public:
SimpleDMatrix() = default;
template <typename AdapterT>
explicit SimpleDMatrix(AdapterT* adapter, float missing, int nthread);
explicit SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
DataSplitMode data_split_mode = DataSplitMode::kRow);

explicit SimpleDMatrix(dmlc::Stream* in_stream);
~SimpleDMatrix() override = default;
Expand Down Expand Up @@ -61,6 +62,15 @@ class SimpleDMatrix : public DMatrix {
bool GHistIndexExists() const override { return static_cast<bool>(gradient_index_); }
bool SparsePageExists() const override { return true; }

/**
* \brief Reindex the features based on a global view.
*
* In some cases (e.g. vertical federated learning), features are loaded locally with indices
* starting from 0. However, all the algorithms assume the features are globally indexed, so we
* reindex the features based on the offset needed to obtain the global view.
*/
void ReindexFeatures();

private:
Context ctx_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/data/sparse_page_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ SparsePageDMatrix::SparsePageDMatrix(DataIterHandle iter_handle, DMatrixHandle p
this->info_.num_col_ = n_features;
this->info_.num_nonzero_ = nnz;

collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.SynchronizeNumberOfColumns();
CHECK_NE(info_.num_col_, 0);
}

Expand Down
Loading

0 comments on commit b240f05

Please sign in to comment.