Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Placement Group] Fix placement group high cpu usage part 1 #18652

Merged
merged 24 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace gcs {
class MockGcsScheduleStrategy : public GcsScheduleStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<ray::BundleSpecification>> & bundles,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
Expand All @@ -79,7 +79,7 @@ namespace gcs {
class MockGcsPackStrategy : public GcsPackStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<ray::BundleSpecification>> & bundles,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
Expand All @@ -94,7 +94,7 @@ namespace gcs {
class MockGcsSpreadStrategy : public GcsSpreadStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<ray::BundleSpecification>> & bundles,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
Expand All @@ -109,7 +109,7 @@ namespace gcs {
class MockGcsStrictPackStrategy : public GcsStrictPackStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<ray::BundleSpecification>> & bundles,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
Expand All @@ -124,7 +124,7 @@ namespace gcs {
class MockGcsStrictSpreadStrategy : public GcsStrictSpreadStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<ray::BundleSpecification>> & bundles,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
Expand Down
4 changes: 2 additions & 2 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class MockResourceReserveInterface : public ResourceReserveInterface {
(override));
MOCK_METHOD(
void, CancelResourceReserve,
(BundleSpecification & bundle_spec,
(const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CancelResourceReserveReply> &callback),
(override));
MOCK_METHOD(void, ReleaseUnusedBundles,
Expand Down Expand Up @@ -164,7 +164,7 @@ class MockRayletClient : public RayletClient {
(override));
MOCK_METHOD(
void, CancelResourceReserve,
(BundleSpecification & bundle_spec,
(const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CancelResourceReserveReply> &callback),
(override));
MOCK_METHOD(void, ReleaseUnusedBundles,
Expand Down
26 changes: 20 additions & 6 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ PlacementGroupID BundleSpecification::PlacementGroupId() const {
return PlacementGroupID::FromBinary(message_->bundle_id().placement_group_id());
}

const NodeID BundleSpecification::NodeId() const {
return NodeID::FromBinary(message_->node_id());
}

int64_t BundleSpecification::Index() const {
return message_->bundle_id().bundle_index();
}
Expand All @@ -90,12 +94,22 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na
const PlacementGroupID &group_id,
int64_t bundle_index) {
std::string str;
const auto &group_id_hex = group_id.Hex();
if (bundle_index >= 0) {
str = original_resource_name + "_group_" + std::to_string(bundle_index) + "_" +
group_id.Hex();
const auto &bundle_index_str = std::to_string(bundle_index);
str.reserve(original_resource_name.size() + kGroupKeywordSize +
bundle_index_str.size() + 1 + group_id_hex.size());
str += original_resource_name;
str += kGroupKeyword;
str += bundle_index_str;
str += "_";
str += group_id_hex;
} else {
RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index;
str = original_resource_name + "_group_" + group_id.Hex();
str.reserve(original_resource_name.size() + kGroupKeywordSize + group_id_hex.size());
str += original_resource_name;
str += kGroupKeyword;
str += group_id_hex;
}
RAY_CHECK(GetOriginalResourceName(str) == original_resource_name) << str;
return str;
Expand All @@ -109,12 +123,12 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na

bool IsBundleIndex(const std::string &resource, const PlacementGroupID &group_id,
const int bundle_index) {
return resource.find("_group_" + std::to_string(bundle_index) + "_" + group_id.Hex()) !=
std::string::npos;
return resource.find(kGroupKeyword + std::to_string(bundle_index) + "_" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think std::to_string here will benefit from this (#18538)

Are you seeing anything different after that PR merged?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group_id.Hex()) != std::string::npos;
}

std::string GetOriginalResourceName(const std::string &resource) {
auto idx = resource.find("_group_");
auto idx = resource.find(kGroupKeyword);
RAY_CHECK(idx >= 0) << "This isn't a placement group resource " << resource;
return resource.substr(0, idx);
}
Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/bundle_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ typedef std::function<void(const ResourceIdSet &)> ScheduleBundleCallback;
/// address and the raylet's port.
typedef std::function<void()> SpillbackBundleCallback;

const std::string kGroupKeyword = "_group_";
const size_t kGroupKeywordSize = kGroupKeyword.size();

class BundleSpecification : public MessageWrapper<rpc::Bundle> {
public:
/// Construct from a protobuf message object.
Expand All @@ -54,6 +57,9 @@ class BundleSpecification : public MessageWrapper<rpc::Bundle> {
// Return the Placement Group id which the Bundle belong to.
PlacementGroupID PlacementGroupId() const;

// Get a node ID that this bundle is scheduled on.
const NodeID NodeId() const;

// Return the index of the bundle.
int64_t Index() const;

Expand Down
1 change: 1 addition & 0 deletions src/ray/common/id.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ std::string BaseID<T>::Hex() const {
constexpr char hex[] = "0123456789abcdef";
const uint8_t *id = Data();
std::string result;
result.reserve(T::Size());
for (size_t i = 0; i < T::Size(); i++) {
unsigned int val = id[i];
result.push_back(hex[val >> 4]);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ RAY_CONFIG(int64_t, max_resource_shapes_per_load_report, 100)
RAY_CONFIG(bool, report_worker_backlog, true)

/// The timeout for synchronous GCS requests in seconds.
RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5)
RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 20)

/// Whether to enable worker prestarting: https://github.com/ray-project/ray/issues/12052
RAY_CONFIG(bool, enable_worker_prestart, true)
Expand Down
51 changes: 30 additions & 21 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,26 @@ std::string GcsPlacementGroup::GetRayNamespace() const {
return placement_group_table_data_.ray_namespace();
}

std::vector<std::shared_ptr<BundleSpecification>> GcsPlacementGroup::GetBundles() const {
const auto &bundles = placement_group_table_data_.bundles();
std::vector<std::shared_ptr<BundleSpecification>> ret_bundles;
for (const auto &bundle : bundles) {
ret_bundles.push_back(std::make_shared<BundleSpecification>(bundle));
const std::vector<std::shared_ptr<const BundleSpecification>>
&GcsPlacementGroup::GetBundles() const {
// Fill the cache if it wasn't.
if (cached_bundle_specs_.empty()) {
const auto &bundles = placement_group_table_data_.bundles();
for (const auto &bundle : bundles) {
cached_bundle_specs_.push_back(std::make_shared<const BundleSpecification>(bundle));
}
}
return ret_bundles;
return cached_bundle_specs_;
}

std::vector<std::shared_ptr<BundleSpecification>> GcsPlacementGroup::GetUnplacedBundles()
const {
const auto &bundles = placement_group_table_data_.bundles();
std::vector<std::shared_ptr<BundleSpecification>> unplaced_bundles;
for (const auto &bundle : bundles) {
if (NodeID::FromBinary(bundle.node_id()).IsNil()) {
unplaced_bundles.push_back(std::make_shared<BundleSpecification>(bundle));
std::vector<std::shared_ptr<const BundleSpecification>>
GcsPlacementGroup::GetUnplacedBundles() const {
const auto &bundle_specs = GetBundles();

std::vector<std::shared_ptr<const BundleSpecification>> unplaced_bundles;
for (const auto &bundle : bundle_specs) {
if (bundle->NodeId().IsNil()) {
unplaced_bundles.push_back(bundle);
}
}
return unplaced_bundles;
Expand All @@ -83,6 +87,8 @@ std::string GcsPlacementGroup::DebugString() const {
}

rpc::Bundle *GcsPlacementGroup::GetMutableBundle(int bundle_index) {
// Invalidate the cache.
cached_bundle_specs_.clear();
return placement_group_table_data_.mutable_bundles(bundle_index);
}

Expand Down Expand Up @@ -242,7 +248,8 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
}

MarkSchedulingDone();
RetryCreatingPlacementGroup();
RetryCreatingPlacementGroup(
RayConfig::instance().gcs_create_placement_group_retry_interval_ms());
}

void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
Expand All @@ -258,7 +265,7 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
[this, placement_group_id](Status status) {
RAY_CHECK_OK(status);

SchedulePendingPlacementGroups();
RetryCreatingPlacementGroup(0);

// Invoke all callbacks for all `WaitPlacementGroupUntilReady` requests of this
// placement group and remove all of them from
Expand Down Expand Up @@ -306,6 +313,7 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() {
}
// If the placement group is not registered == removed.
}
++counts_[CountType::SCHEDULING_PENDING_PLACEMENT_GROUP];
}

void GcsPlacementGroupManager::HandleCreatePlacementGroup(
Expand Down Expand Up @@ -556,9 +564,8 @@ void GcsPlacementGroupManager::WaitPlacementGroup(
}
}

void GcsPlacementGroupManager::RetryCreatingPlacementGroup() {
execute_after(io_context_, [this] { SchedulePendingPlacementGroups(); },
RayConfig::instance().gcs_create_placement_group_retry_interval_ms());
void GcsPlacementGroupManager::RetryCreatingPlacementGroup(uint32_t delay_ms) {
execute_after(io_context_, [this] { SchedulePendingPlacementGroups(); }, delay_ms);
}

void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) {
Expand All @@ -581,7 +588,7 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) {
}
}

SchedulePendingPlacementGroups();
RetryCreatingPlacementGroup(0);
}

void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead(
Expand Down Expand Up @@ -633,7 +640,7 @@ void GcsPlacementGroupManager::Tick() {
// To avoid scheduling exhaution in some race conditions.
// Note that we don't currently have a known race condition that requires this, but we
// added as a safety check. https://github.com/ray-project/ray/pull/18419
SchedulePendingPlacementGroups();
RetryCreatingPlacementGroup(0);
execute_after(io_context_, [this] { Tick(); }, 1000 /* milliseconds */);
}

Expand Down Expand Up @@ -684,7 +691,7 @@ void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
// Notify raylets to release unused bundles.
gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles);

SchedulePendingPlacementGroups();
RetryCreatingPlacementGroup(0);
}

std::string GcsPlacementGroupManager::DebugString() const {
Expand All @@ -705,6 +712,8 @@ std::string GcsPlacementGroupManager::DebugString() const {
<< counts_[CountType::WAIT_PLACEMENT_GROUP_UNTIL_READY_REQUEST]
<< ", GetNamedPlacementGroup request count: "
<< counts_[CountType::GET_NAMED_PLACEMENT_GROUP_REQUEST]
<< ", Scheduling pending placement group count: "
<< counts_[CountType::SCHEDULING_PENDING_PLACEMENT_GROUP]
<< ", Registered placement groups count: " << registered_placement_groups_.size()
<< ", Named placement group count: " << num_pgs
<< ", Pending placement groups count: " << pending_placement_groups_.size()
Expand Down
15 changes: 11 additions & 4 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#pragma once
#include <gtest/gtest_prod.h>
#include <utility>

#include "absl/container/flat_hash_map.h"
Expand Down Expand Up @@ -89,10 +90,10 @@ class GcsPlacementGroup {
std::string GetRayNamespace() const;

/// Get the bundles of this placement_group (including unplaced).
std::vector<std::shared_ptr<BundleSpecification>> GetBundles() const;
const std::vector<std::shared_ptr<const BundleSpecification>> &GetBundles() const;

/// Get the unplaced bundles of this placement group.
std::vector<std::shared_ptr<BundleSpecification>> GetUnplacedBundles() const;
std::vector<std::shared_ptr<const BundleSpecification>> GetUnplacedBundles() const;

/// Get the Strategy
rpc::PlacementStrategy GetStrategy() const;
Expand Down Expand Up @@ -121,9 +122,14 @@ class GcsPlacementGroup {
bool IsDetached() const;

private:
FRIEND_TEST(GcsPlacementGroupManagerTest, TestPlacementGroupBundleCache);
/// The placement_group meta data which contains the task specification as well as the
/// state of the gcs placement_group and so on (see gcs.proto).
rpc::PlacementGroupTableData placement_group_table_data_;
/// Creating bundle specification requires heavy computation because it needs to compute
/// formatted strings for all resources (heavy string operations). To optimize the CPU
/// usage, we cache bundle specs.
mutable std::vector<std::shared_ptr<const BundleSpecification>> cached_bundle_specs_;
};

/// GcsPlacementGroupManager is responsible for managing the lifecycle of all placement
Expand Down Expand Up @@ -270,7 +276,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {

private:
/// Try to create placement group after a short time.
void RetryCreatingPlacementGroup();
void RetryCreatingPlacementGroup(uint32_t delay_ms);

/// Mark the manager that there's a placement group scheduling going on.
void MarkSchedulingStarted(const PlacementGroupID placement_group_id) {
Expand Down Expand Up @@ -352,7 +358,8 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
GET_ALL_PLACEMENT_GROUP_REQUEST = 3,
WAIT_PLACEMENT_GROUP_UNTIL_READY_REQUEST = 4,
GET_NAMED_PLACEMENT_GROUP_REQUEST = 5,
CountType_MAX = 6,
SCHEDULING_PENDING_PLACEMENT_GROUP = 6,
CountType_MAX = 7,
};
uint64_t counts_[CountType::CountType_MAX] = {0};
};
Expand Down
Loading