-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CM: Add on-demand cluster discovery functionality #15857
Changes from all commits
0652500
050c490
d5f74fb
805ef37
8cac4ac
3c4cfa5
75a0c46
a53ea84
a718e3b
cbe797e
bc1d0df
b32112f
4e52486
7ea51f0
fad4410
4f854b5
bb2e17c
caad394
722562c
2fd85dc
61a07d2
1002639
c4eef58
161d896
6058b3d
857f696
d138b40
d694f4b
11031da
00cc3b3
1d11e12
f912324
ccc45c5
8464e01
3c5ec84
1db17a0
60394a4
a513cb6
7b3f78b
bb3eb45
124179a
788680a
04f8f43
411e72d
f6ee71a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,7 @@ namespace Envoy { | |
namespace Upstream { | ||
|
||
/** | ||
* ClusterUpdateCallbacks provide a way to exposes Cluster lifecycle events in the | ||
* ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the | ||
* ClusterManager. | ||
*/ | ||
class ClusterUpdateCallbacks { | ||
|
@@ -72,6 +72,76 @@ class ClusterUpdateCallbacksHandle { | |
|
||
using ClusterUpdateCallbacksHandlePtr = std::unique_ptr<ClusterUpdateCallbacksHandle>; | ||
|
||
/** | ||
* Status enum for the result of an attempted cluster discovery. | ||
*/ | ||
enum class ClusterDiscoveryStatus { | ||
/** | ||
* The discovery process timed out. This means that we haven't yet received any reply from | ||
* on-demand CDS about it. | ||
*/ | ||
Timeout, | ||
/** | ||
* The discovery process has concluded and on-demand CDS has no such cluster. | ||
*/ | ||
Missing, | ||
/** | ||
* Cluster found and currently available through ClusterManager. | ||
*/ | ||
Available, | ||
}; | ||
|
||
/** | ||
* ClusterDiscoveryCallback is a callback called at the end of the on-demand cluster discovery | ||
* process. The status of the discovery is sent as a parameter. | ||
*/ | ||
using ClusterDiscoveryCallback = std::function<void(ClusterDiscoveryStatus)>; | ||
using ClusterDiscoveryCallbackPtr = std::unique_ptr<ClusterDiscoveryCallback>; | ||
|
||
/** | ||
* ClusterDiscoveryCallbackHandle is a RAII wrapper for a ClusterDiscoveryCallback. Deleting the | ||
* ClusterDiscoveryCallbackHandle will remove the callbacks from ClusterManager. | ||
*/ | ||
class ClusterDiscoveryCallbackHandle { | ||
public: | ||
virtual ~ClusterDiscoveryCallbackHandle() = default; | ||
}; | ||
|
||
using ClusterDiscoveryCallbackHandlePtr = std::unique_ptr<ClusterDiscoveryCallbackHandle>; | ||
|
||
/** | ||
* A handle to an on-demand CDS. | ||
*/ | ||
class OdCdsApiHandle { | ||
public: | ||
virtual ~OdCdsApiHandle() = default; | ||
|
||
/** | ||
* Request an on-demand discovery of a cluster with a passed name. This ODCDS may be used to | ||
* perform the discovery process in the main thread if there is no discovery going on for this | ||
* cluster. When the requested cluster is added and warmed up, the passed callback will be invoked | ||
* in the same thread that invoked this function. | ||
* | ||
* The returned handle can be destroyed to prevent the callback to be invoked. Note that the | ||
* handle can only be destroyed in the same thread that invoked the function. Destroying the | ||
* handle might not stop the discovery process, though. As soon as the callback is invoked, | ||
* destroying the handle does nothing. It is a responsibility of the caller to make sure that the | ||
* objects captured in the callback outlive the callback. | ||
* | ||
* This function is thread-safe. | ||
* | ||
* @param name is the name of the cluster to be discovered. | ||
* @param callback will be called when the discovery is finished. | ||
* @param timeout describes how long the operation may take before failing. | ||
* @return ClusterDiscoveryCallbackHandlePtr the discovery process handle. | ||
*/ | ||
virtual ClusterDiscoveryCallbackHandlePtr | ||
requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback, | ||
std::chrono::milliseconds timeout) PURE; | ||
}; | ||
|
||
using OdCdsApiHandlePtr = std::unique_ptr<OdCdsApiHandle>; | ||
|
||
class ClusterManagerFactory; | ||
|
||
// These are per-cluster per-thread, so not "global" stats. | ||
|
@@ -309,6 +379,19 @@ class ClusterManager { | |
virtual const ClusterRequestResponseSizeStatNames& | ||
clusterRequestResponseSizeStatNames() const PURE; | ||
virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE; | ||
|
||
/** | ||
* Allocates an on-demand CDS API provider from configuration proto or locator. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are the semantics when we are using CDS and have both regular CDS wildcard subscription and OCDS on the same stream? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this out. To rehash:
So, it looks like those two currently can't work together in one stream. Now, I don't know if this ignoring of requests with specific resource names while being in wildcard mode is enforced on Envoy side or this is an expectation from the config server. I don't know what is the rationale behind this restriction, so I'd ask if lifting this restriction would be an acceptable idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @markdroth added that restriction in https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol. I also agree it would be good to be able to combine wildcard and on-demand CDS here on the same stream. Thoughts @markdroth? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As currently defined, it's not possible to have both a wildcard subscription and subscription to specific resource names on the same stream. Note, however, that wildcard subscriptions are intended to go away as part of the new naming scheme, so one option here would be to simply say that you can't do both unless you migrate to the new naming scheme. If that's not feasible, we could look into making some tranport protocol change to support this. For example, maybe the wildcard request could be represented by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Is there a reason why both wildcard subscription and subscription to specific resource names are impossible on the same stream? I can see the conflict between SotW wildcard subscription and subscription to specific resource names (SotW update would likely delete the clusters we got on-demand). Such conflict wouldn't exist with delta wildcard subscription. I suppose I could limit the on-demand CDS to be usable only with delta gRPC. About the naming scheme - I need to read about this, so can't say much about it. Just wanted to say that I dunno if cluster names coming from cluster_header action checking the
Wouldn't changing the wildcard request representation to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SG. Let's update https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol as part of this proposal then. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If CDS is configured and ODCDS is not configured, what initial request will contain in resource names? An empty list or I suppose that this "static" decision would be the way to go, because CDS is usually configured statically, while ODCDS might be configured much later (for example, ODCDS could be a part of some route configuration received from RDS). I'm probably wrong somewhere, so please correct me if it's the case. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we require this to be explicitly configured, then we need to figure out how to handle the case where someone configures CDS to leave the field unset instead of setting it to Note that currently, wildcard mode is determined based on the first request for that resource type on a given stream, and there is no way to subsequently switch in or out of wildcard mode for that resource type on that stream. So if the first CDS request on a stream has the https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return I think that what we should do here is to change the spec to make it possible to switch out of wildcard code after the first request on a stream. However, it will still be impossible to switch into wildcard mode once in non-wildcard mode, because otherwise there will be no way to unsubscribe from the last resource of a given type. In other words:
Note that in terms of the spec, I think we can say that servers should be prepared to accept In terms of the Envoy implementation, I think we can figure out what to send automatically at the moment that we send each request. We just need to track a bool indicating whether the stream has previously seen a non-wildcard subscription for a given resource type. If it hasn't, then we can leave the field unset; if it has, then we send FWIW, note that EDS never makes wildcard requests. The only resource types that make wildcard requests are LDS, CDS, and (I think) SRDS. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SG, I think your third case doesn't apply with delta xDS though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, for delta xDS, instead of the |
||
* | ||
* @param odcds_config is a configuration proto. Used when odcds_resources_locator is a nullopt. | ||
* @param odcds_resources_locator is a locator for ODCDS. Used over odcds_config if not a nullopt. | ||
* @param validation_visitor | ||
* @return OdCdsApiHandlePtr the ODCDS handle. | ||
*/ | ||
virtual OdCdsApiHandlePtr | ||
allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config, | ||
OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator, | ||
ProtobufMessage::ValidationVisitor& validation_visitor) PURE; | ||
}; | ||
|
||
using ClusterManagerPtr = std::unique_ptr<ClusterManager>; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,21 +22,23 @@ DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url, | |
[this](const auto& expired) { | ||
Protobuf::RepeatedPtrField<std::string> removed_resources; | ||
for (const auto& resource : expired) { | ||
setResourceWaitingForServer(resource); | ||
removed_resources.Add(std::string(resource)); | ||
if (setResourceWaitingForServer(resource)) { | ||
removed_resources.Add(std::string(resource)); | ||
} | ||
} | ||
|
||
watch_map_.onConfigUpdate({}, removed_resources, ""); | ||
}, | ||
dispatcher, dispatcher.timeSource()), | ||
type_url_(std::move(type_url)), wildcard_(wildcard), watch_map_(watch_map), | ||
type_url_(std::move(type_url)), | ||
mode_(wildcard ? WildcardMode::Implicit : WildcardMode::Disabled), watch_map_(watch_map), | ||
local_info_(local_info), dispatcher_(dispatcher) {} | ||
|
||
void DeltaSubscriptionState::updateSubscriptionInterest( | ||
const absl::flat_hash_set<std::string>& cur_added, | ||
const absl::flat_hash_set<std::string>& cur_removed) { | ||
for (const auto& a : cur_added) { | ||
setResourceWaitingForServer(a); | ||
addResourceWaitingForServer(a, ResourceType::ExplicitlyRequested); | ||
// If interest in a resource is removed-then-added (all before a discovery request | ||
// can be sent), we must treat it as a "new" addition: our user may have forgotten its | ||
// copy of the resource after instructing us to remove it, and need to be reminded of it. | ||
|
@@ -53,6 +55,31 @@ void DeltaSubscriptionState::updateSubscriptionInterest( | |
names_added_.erase(r); | ||
names_removed_.insert(r); | ||
} | ||
switch (mode_) { | ||
case WildcardMode::Implicit: | ||
if (names_removed_.find("*") != names_removed_.end()) { | ||
// we explicitly cancel the wildcard subscription | ||
mode_ = WildcardMode::Disabled; | ||
} else if (!names_added_.empty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. I'm not sure about this: an explicit mode requires presence of a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Only on the initial request (after the stream reset, for example), or on the first request where we decide to opt-in into the wildcard mode. When switching from implicit to explicit mode, no need to send I'll need to go through the documentation I wrote to make sure that it's clear.
Sorry, I'm not following here. There isn't any logic that resets us to |
||
// switch to explicit mode if we requested some extra names | ||
mode_ = WildcardMode::Explicit; | ||
} | ||
break; | ||
|
||
case WildcardMode::Explicit: | ||
if (names_removed_.find("*") != names_removed_.end()) { | ||
// we explicitly cancel the wildcard subscription | ||
mode_ = WildcardMode::Disabled; | ||
} | ||
break; | ||
|
||
case WildcardMode::Disabled: | ||
if (names_added_.find("*") != names_added_.end()) { | ||
// we switch into an explicit wildcard subscription | ||
mode_ = WildcardMode::Explicit; | ||
} | ||
break; | ||
} | ||
} | ||
|
||
// Not having sent any requests yet counts as an "update pending" since you're supposed to resend | ||
|
@@ -124,7 +151,7 @@ void DeltaSubscriptionState::handleGoodResponse( | |
{ | ||
const auto scoped_update = ttl_.scopedTtlUpdate(); | ||
for (const auto& resource : message.resources()) { | ||
addResourceState(resource); | ||
addResourceState(resource, ResourceType::ReceivedFromServer); | ||
} | ||
} | ||
|
||
|
@@ -140,9 +167,7 @@ void DeltaSubscriptionState::handleGoodResponse( | |
// initial_resource_versions messages, but will remind us to explicitly tell the server "I'm | ||
// cancelling my subscription" when we lose interest. | ||
for (const auto& resource_name : message.removed_resources()) { | ||
if (resource_names_.find(resource_name) != resource_names_.end()) { | ||
setResourceWaitingForServer(resource_name); | ||
} | ||
setResourceWaitingForServer(resource_name); | ||
} | ||
ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_, | ||
message.resources().size(), message.removed_resources().size()); | ||
|
@@ -177,16 +202,20 @@ DeltaSubscriptionState::getNextRequestAckless() { | |
if (!resource_state.waitingForServer()) { | ||
(*request.mutable_initial_resource_versions())[resource_name] = resource_state.version(); | ||
} | ||
// As mentioned above, fill resource_names_subscribe with everything, including names we | ||
// have yet to receive any resource for unless this is a wildcard subscription, for which | ||
// the first request on a stream must be without any resource names. | ||
if (!wildcard_) { | ||
// Add resource names to resource_names_subscribe only if this is not a wildcard subscription | ||
// request or if we requested this resource explicitly (so we are actually in explicit | ||
// wildcard mode). | ||
if (mode_ == WildcardMode::Disabled || | ||
resource_state.type() == ResourceType::ExplicitlyRequested) { | ||
names_added_.insert(resource_name); | ||
} | ||
} | ||
// Wildcard subscription initial requests must have no resource_names_subscribe. | ||
if (wildcard_) { | ||
names_added_.clear(); | ||
// We are not clearing the names_added_ set. If we are in implicit wildcard subscription mode, | ||
// then the set should already be empty. If we are in explicit wildcard mode then the set will | ||
// contain the names we explicitly requested, but we need to add * to the list to make sure it's | ||
// sent too. | ||
if (mode_ == WildcardMode::Explicit) { | ||
names_added_.insert("*"); | ||
} | ||
names_removed_.clear(); | ||
} | ||
|
@@ -214,26 +243,44 @@ DeltaSubscriptionState::getNextRequestWithAck(const UpdateAck& ack) { | |
} | ||
|
||
void DeltaSubscriptionState::addResourceState( | ||
const envoy::service::discovery::v3::Resource& resource) { | ||
const envoy::service::discovery::v3::Resource& resource, ResourceType type) { | ||
krnowak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (resource.has_ttl()) { | ||
ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())), | ||
resource.name()); | ||
} else { | ||
ttl_.clear(resource.name()); | ||
} | ||
|
||
resource_state_[resource.name()] = ResourceState(resource); | ||
resource_names_.insert(resource.name()); | ||
if (auto it = resource_state_.find(resource.name()); it != resource_state_.end()) { | ||
auto old_type = it->second.type(); | ||
it->second = ResourceState(resource, effectiveResourceType(old_type, type)); | ||
} else { | ||
resource_state_.insert({resource.name(), ResourceState(resource, type)}); | ||
} | ||
} | ||
|
||
bool DeltaSubscriptionState::setResourceWaitingForServer(const std::string& resource_name) { | ||
krnowak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
auto itr = resource_state_.find(resource_name); | ||
if (itr == resource_state_.end()) { | ||
return false; | ||
} | ||
auto old_type = itr->second.type(); | ||
itr->second = ResourceState(old_type); | ||
return true; | ||
} | ||
|
||
void DeltaSubscriptionState::setResourceWaitingForServer(const std::string& resource_name) { | ||
resource_state_[resource_name] = ResourceState(); | ||
resource_names_.insert(resource_name); | ||
void DeltaSubscriptionState::addResourceWaitingForServer(const std::string& resource_name, | ||
krnowak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ResourceType type) { | ||
if (auto it = resource_state_.find(resource_name); it != resource_state_.end()) { | ||
auto old_type = it->second.type(); | ||
it->second = ResourceState(effectiveResourceType(old_type, type)); | ||
} else { | ||
resource_state_.insert({resource_name, ResourceState(type)}); | ||
} | ||
} | ||
|
||
void DeltaSubscriptionState::removeResourceState(const std::string& resource_name) { | ||
resource_state_.erase(resource_name); | ||
resource_names_.erase(resource_name); | ||
} | ||
|
||
} // namespace Config | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that you'll need to add as we finalize on this PR is version history and also probably some docs like https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/on_demand_updates_filter.html. Maybe also update https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol to reference (places where CDS and VHDS crop up are a good start).