Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make objectives work with vertical distributed and federated learning #9002

Merged
merged 11 commits into from
Apr 3, 2023
69 changes: 38 additions & 31 deletions src/objective/adaptive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void UpdateTreeLeafHost(Context const* ctx, std::vector<bst_node_t> const& posit
size_t n_leaf = nidx.size();
if (nptr.empty()) {
std::vector<float> quantiles;
UpdateLeafValues(&quantiles, nidx, learning_rate, p_tree);
UpdateLeafValues(&quantiles, nidx, info, learning_rate, p_tree);
return;
}

Expand All @@ -99,39 +99,46 @@ void UpdateTreeLeafHost(Context const* ctx, std::vector<bst_node_t> const& posit
auto h_predt = linalg::MakeTensorView(ctx, predt.ConstHostSpan(), info.num_row_,
predt.Size() / info.num_row_);

// loop over each leaf
common::ParallelFor(quantiles.size(), ctx->Threads(), [&](size_t k) {
auto nidx = h_node_idx[k];
CHECK(tree[nidx].IsLeaf());
CHECK_LT(k + 1, h_node_ptr.size());
size_t n = h_node_ptr[k + 1] - h_node_ptr[k];
auto h_row_set = common::Span<size_t const>{ridx}.subspan(h_node_ptr[k], n);

auto h_labels = info.labels.HostView().Slice(linalg::All(), IdxY(info, group_idx));
auto h_weights = linalg::MakeVec(&info.weights_);

auto iter = common::MakeIndexTransformIter([&](size_t i) -> float {
auto row_idx = h_row_set[i];
return h_labels(row_idx) - h_predt(row_idx, group_idx);
});
auto w_it = common::MakeIndexTransformIter([&](size_t i) -> float {
auto row_idx = h_row_set[i];
return h_weights(row_idx);
if (!info.IsVerticalFederated() || collective::GetRank() == 0) {
// loop over each leaf
common::ParallelFor(quantiles.size(), ctx->Threads(), [&](size_t k) {
auto nidx = h_node_idx[k];
CHECK(tree[nidx].IsLeaf());
CHECK_LT(k + 1, h_node_ptr.size());
size_t n = h_node_ptr[k + 1] - h_node_ptr[k];
auto h_row_set = common::Span<size_t const>{ridx}.subspan(h_node_ptr[k], n);

auto h_labels = info.labels.HostView().Slice(linalg::All(), IdxY(info, group_idx));
auto h_weights = linalg::MakeVec(&info.weights_);

auto iter = common::MakeIndexTransformIter([&](size_t i) -> float {
auto row_idx = h_row_set[i];
return h_labels(row_idx) - h_predt(row_idx, group_idx);
});
auto w_it = common::MakeIndexTransformIter([&](size_t i) -> float {
auto row_idx = h_row_set[i];
return h_weights(row_idx);
});

float q{0};
if (info.weights_.Empty()) {
q = common::Quantile(ctx, alpha, iter, iter + h_row_set.size());
} else {
q = common::WeightedQuantile(ctx, alpha, iter, iter + h_row_set.size(), w_it);
}
if (std::isnan(q)) {
CHECK(h_row_set.empty());
}
quantiles.at(k) = q;
});
}

float q{0};
if (info.weights_.Empty()) {
q = common::Quantile(ctx, alpha, iter, iter + h_row_set.size());
} else {
q = common::WeightedQuantile(ctx, alpha, iter, iter + h_row_set.size(), w_it);
}
if (std::isnan(q)) {
CHECK(h_row_set.empty());
}
quantiles.at(k) = q;
});
if (info.IsVerticalFederated()) {
collective::Broadcast(static_cast<void*>(quantiles.data()), quantiles.size() * sizeof(float),
0);
}

UpdateLeafValues(&quantiles, nidx, learning_rate, p_tree);
UpdateLeafValues(&quantiles, nidx, info, learning_rate, p_tree);
}

#if !defined(XGBOOST_USE_CUDA)
Expand Down
4 changes: 2 additions & 2 deletions src/objective/adaptive.cu
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void UpdateTreeLeafDevice(Context const* ctx, common::Span<bst_node_t const> pos

if (nptr.Empty()) {
std::vector<float> quantiles;
UpdateLeafValues(&quantiles, nidx.ConstHostVector(), learning_rate, p_tree);
UpdateLeafValues(&quantiles, nidx.ConstHostVector(), info, learning_rate, p_tree);
}

HostDeviceVector<float> quantiles;
Expand Down Expand Up @@ -186,7 +186,7 @@ void UpdateTreeLeafDevice(Context const* ctx, common::Span<bst_node_t const> pos
w_it + d_weights.size(), &quantiles);
}

UpdateLeafValues(&quantiles.HostVector(), nidx.ConstHostVector(), learning_rate, p_tree);
UpdateLeafValues(&quantiles.HostVector(), nidx.ConstHostVector(), info, learning_rate, p_tree);
}
} // namespace detail
} // namespace obj
Expand Down
14 changes: 10 additions & 4 deletions src/objective/adaptive.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ inline void FillMissingLeaf(std::vector<bst_node_t> const& maybe_missing,
}

inline void UpdateLeafValues(std::vector<float>* p_quantiles, std::vector<bst_node_t> const& nidx,
float learning_rate, RegTree* p_tree) {
MetaInfo const& info, float learning_rate, RegTree* p_tree) {
auto& tree = *p_tree;
auto& quantiles = *p_quantiles;
auto const& h_node_idx = nidx;

size_t n_leaf{h_node_idx.size()};
collective::Allreduce<collective::Operation::kMax>(&n_leaf, 1);
if (info.IsRowSplit()) {
collective::Allreduce<collective::Operation::kMax>(&n_leaf, 1);
}
CHECK(quantiles.empty() || quantiles.size() == n_leaf);
if (quantiles.empty()) {
quantiles.resize(n_leaf, std::numeric_limits<float>::quiet_NaN());
Expand All @@ -52,12 +54,16 @@ inline void UpdateLeafValues(std::vector<float>* p_quantiles, std::vector<bst_no
std::vector<int32_t> n_valids(quantiles.size());
std::transform(quantiles.cbegin(), quantiles.cend(), n_valids.begin(),
[](float q) { return static_cast<int32_t>(!std::isnan(q)); });
collective::Allreduce<collective::Operation::kSum>(n_valids.data(), n_valids.size());
if (info.IsRowSplit()) {
collective::Allreduce<collective::Operation::kSum>(n_valids.data(), n_valids.size());
}
// convert to 0 for all reduce
std::replace_if(
quantiles.begin(), quantiles.end(), [](float q) { return std::isnan(q); }, 0.f);
// use the mean value
collective::Allreduce<collective::Operation::kSum>(quantiles.data(), quantiles.size());
if (info.IsRowSplit()) {
collective::Allreduce<collective::Operation::kSum>(quantiles.data(), quantiles.size());
}
for (size_t i = 0; i < n_leaf; ++i) {
if (n_valids[i] > 0) {
quantiles[i] /= static_cast<float>(n_valids[i]);
Expand Down
11 changes: 8 additions & 3 deletions src/objective/quantile_obj.cu
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ class QuantileRegression : public ObjFunction {
bst_target_t Targets(MetaInfo const& info) const override {
auto const& alpha = param_.quantile_alpha.Get();
CHECK_EQ(alpha.size(), alpha_.Size()) << "The objective is not yet configured.";
CHECK_EQ(info.labels.Shape(1), 1) << "Multi-target is not yet supported by the quantile loss.";
if (!info.IsVerticalFederated() || collective::GetRank() == 0) {
CHECK_EQ(info.labels.Shape(1), 1)
<< "Multi-target is not yet supported by the quantile loss.";
}
CHECK(!alpha.empty());
// We have some placeholders for multi-target in the quantile loss. But it's not
// supported as the gbtree doesn't know how to slice the gradient and there's no 3-dim
Expand Down Expand Up @@ -167,8 +170,10 @@ class QuantileRegression : public ObjFunction {
common::Mean(ctx_, *base_score, &temp);
double meanq = temp(0) * sw;

collective::Allreduce<collective::Operation::kSum>(&meanq, 1);
collective::Allreduce<collective::Operation::kSum>(&sw, 1);
if (info.IsRowSplit()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make a wrapper function for Allreduce so that newcomers to the code base can understand these conditions? I think we need some separation for communication logic and machine learning algorithm implementation. I'm open to any idea, at the moment it's quite difficult for a "machine learning person" to add a new algorithm and make sure it works correctly with different distributed system requirements. I can see that these conditions are necessary for metrics as well since they require labels. Is there a way to make this less intrusive by abstracting the logic under collective instead of exposing it to the machine learning part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is Allreduce is a low level primitive, but column-wise split and vertical federated learning are higher level semantic changes, so they don't always map to each other. At the level of the communicator, I don't think we can determine whether we should skip a particular Allreduce call, even if we know what distributed/federated environment we are in.

Maybe we can come up with a clever mechanism to make this cleaner. Perhaps as a follow up?

collective::Allreduce<collective::Operation::kSum>(&meanq, 1);
collective::Allreduce<collective::Operation::kSum>(&sw, 1);
}
meanq /= (sw + kRtEps);
base_score->Reshape(1);
base_score->Data()->Fill(meanq);
Expand Down
6 changes: 4 additions & 2 deletions src/objective/regression_obj.cu
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,10 @@ class MeanAbsoluteError : public ObjFunction {
std::transform(linalg::cbegin(out), linalg::cend(out), linalg::begin(out),
[w](float v) { return v * w; });

collective::Allreduce<collective::Operation::kSum>(out.Values().data(), out.Values().size());
collective::Allreduce<collective::Operation::kSum>(&w, 1);
if (info.IsRowSplit()) {
collective::Allreduce<collective::Operation::kSum>(out.Values().data(), out.Values().size());
collective::Allreduce<collective::Operation::kSum>(&w, 1);
}

if (common::CloseTo(w, 0.0)) {
// Mostly for handling empty dataset test.
Expand Down
127 changes: 76 additions & 51 deletions tests/cpp/plugin/test_federated_learner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,66 +13,91 @@

namespace xgboost {

void VerifyObjectives(size_t rows, size_t cols, std::vector<float> const &expected_base_scores,
std::vector<Json> const &expected_models) {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::shared_ptr<DMatrix> dmat{RandomDataGenerator{rows, cols, 0}.GenerateDMatrix(rank == 0)};

if (rank == 0) {
auto &h_upper = dmat->Info().labels_upper_bound_.HostVector();
auto &h_lower = dmat->Info().labels_lower_bound_.HostVector();
h_lower.resize(rows);
h_upper.resize(rows);
for (size_t i = 0; i < rows; ++i) {
h_lower[i] = 1;
h_upper[i] = 10;
}
}
std::shared_ptr<DMatrix> sliced{dmat->SliceCol(world_size, rank)};

auto i = 0;
for (auto const *entry : ::dmlc::Registry<::xgboost::ObjFunctionReg>::List()) {
std::unique_ptr<Learner> learner{Learner::Create({sliced})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", entry->name);
if (entry->name.find("quantile") != std::string::npos) {
learner->SetParam("quantile_alpha", "0.5");
}
if (entry->name.find("multi") != std::string::npos) {
learner->SetParam("num_class", "3");
}
learner->UpdateOneIter(0, sliced);

Json config{Object{}};
learner->SaveConfig(&config);
auto base_score = GetBaseScore(config);
ASSERT_EQ(base_score, expected_base_scores[i]);

Json model{Object{}};
learner->SaveModel(&model);
ASSERT_EQ(model, expected_models[i]);

i++;
}
}

class FederatedLearnerTest : public BaseFederatedTest {
protected:
static auto constexpr kRows{16};
static auto constexpr kCols{16};
};

void VerifyBaseScore(size_t rows, size_t cols, float expected_base_score) {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::shared_ptr<DMatrix> Xy_{RandomDataGenerator{rows, cols, 0}.GenerateDMatrix(rank == 0)};
std::shared_ptr<DMatrix> sliced{Xy_->SliceCol(world_size, rank)};
std::unique_ptr<Learner> learner{Learner::Create({sliced})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, sliced);
Json config{Object{}};
learner->SaveConfig(&config);
auto base_score = GetBaseScore(config);
ASSERT_EQ(base_score, expected_base_score);
}

void VerifyModel(size_t rows, size_t cols, Json const& expected_model) {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::shared_ptr<DMatrix> Xy_{RandomDataGenerator{rows, cols, 0}.GenerateDMatrix(rank == 0)};
std::shared_ptr<DMatrix> sliced{Xy_->SliceCol(world_size, rank)};
std::unique_ptr<Learner> learner{Learner::Create({sliced})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, sliced);
Json model{Object{}};
learner->SaveModel(&model);
ASSERT_EQ(model, expected_model);
}
TEST_F(FederatedLearnerTest, Objectives) {
std::shared_ptr<DMatrix> dmat{RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true)};

TEST_F(FederatedLearnerTest, BaseScore) {
std::shared_ptr<DMatrix> Xy_{RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true)};
std::unique_ptr<Learner> learner{Learner::Create({Xy_})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, Xy_);
Json config{Object{}};
learner->SaveConfig(&config);
auto base_score = GetBaseScore(config);
ASSERT_NE(base_score, ObjFunction::DefaultBaseScore());
auto &h_upper = dmat->Info().labels_upper_bound_.HostVector();
auto &h_lower = dmat->Info().labels_lower_bound_.HostVector();
h_lower.resize(kRows);
h_upper.resize(kRows);
for (size_t i = 0; i < kRows; ++i) {
h_lower[i] = 1;
h_upper[i] = 10;
}

RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyBaseScore, kRows, kCols,
base_score);
}
std::vector<float> base_scores;
std::vector<Json> models;
for (auto const *entry : ::dmlc::Registry<::xgboost::ObjFunctionReg>::List()) {
std::unique_ptr<Learner> learner{Learner::Create({dmat})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", entry->name);
if (entry->name.find("quantile") != std::string::npos) {
learner->SetParam("quantile_alpha", "0.5");
}
if (entry->name.find("multi") != std::string::npos) {
learner->SetParam("num_class", "3");
}
learner->UpdateOneIter(0, dmat);
Json config{Object{}};
learner->SaveConfig(&config);
base_scores.emplace_back(GetBaseScore(config));

TEST_F(FederatedLearnerTest, Model) {
std::shared_ptr<DMatrix> Xy_{RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true)};
std::unique_ptr<Learner> learner{Learner::Create({Xy_})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, Xy_);
Json model{Object{}};
learner->SaveModel(&model);
Json model{Object{}};
learner->SaveModel(&model);
models.emplace_back(model);
}

RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyModel, kRows, kCols,
std::cref(model));
RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyObjectives, kRows, kCols,
base_scores, models);
}
} // namespace xgboost
Loading