Skip to content

Commit

Permalink
Add support for explicit wildcard resource (#16855)
Browse files Browse the repository at this point in the history
This enables a client to send a special resource named * to a server as a way to express an interest in a wildcard subscription. That way the wildcard subscription can coexist with subscriptions to other resources on the same stream. The wildcard resource, like any other resource, can be subscribed to and unsubscribed from at any time. The legacy way of enabling wildcard subscription (by sending an empty initial message) is still supported, but otherwise behaves just like a subscription to *. This behavior is gated behind the enabled-by-default "envoy.restart_features.explicit_wildcard_resource" runtime guard.

Risk Level:
High, changes in wildcard mode.

Testing:
As a part of #15523. Also done internally.

Docs Changes:
Updated version history.

Release Notes:
xds: implemented the wildcard resource, which can be used by xDS to express an interest in wildcard subscription in a way that allows subscriptions to other resources coexist on the same stream instead of being ignored. This means that the resource name * is reserved. See :ref:wildcard mode description <xds_protocol_resource_hints> and :ref:unsubscribing from resources <xds_protocol_unsubscribing> for details. This behavior is gated behind the envoy.restart_features.explicit_wildcard_resource runtime guard.
Platform Specific Features:
N/A

Runtime guard:
envoy.restart_features.explicit_wildcard_resource enabled by default.

Signed-off-by: Krzesimir Nowak <[email protected]>
  • Loading branch information
krnowak authored Oct 21, 2021
1 parent b404746 commit 4fc3612
Show file tree
Hide file tree
Showing 29 changed files with 2,651 additions and 421 deletions.
2 changes: 2 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Incompatible Behavior Changes
-----------------------------
*Changes that are expected to cause an incompatibility if applicable; deployment changes are likely required*

* xds: ``*`` became a reserved name for a wildcard resource that can be subscribed to and unsubscribed from at any time. This is a requirement for implementing the on-demand xDSes (like on-demand CDS) that can subscribe to specific resources next to their wildcard subscription. If such xDS is subscribed to both wildcard resource and to other specific resource, then in stream reconnection scenario, the xDS will not send an empty initial request, but a request containing ``*`` for wildcard subscription and the rest of the resources the xDS is subscribed to. If the xDS is only subscribed to wildcard resource, it will try to send a legacy wildcard request. This behavior implements the recent changes in :ref:`xDS protocol <xds_protocol>` and can be temporarily reverted by setting the ``envoy.restart_features.explicit_wildcard_resource`` runtime guard to false.

Minor Behavior Changes
----------------------
*Changes that may cause incompatibilities for some users, but should not for most*
Expand Down
13 changes: 11 additions & 2 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,16 @@ envoy_cc_library(

envoy_cc_library(
name = "delta_subscription_state_lib",
srcs = ["delta_subscription_state.cc"],
hdrs = ["delta_subscription_state.h"],
srcs = [
"delta_subscription_state.cc",
"new_delta_subscription_state.cc",
"old_delta_subscription_state.cc",
],
hdrs = [
"delta_subscription_state.h",
"new_delta_subscription_state.h",
"old_delta_subscription_state.h",
],
deps = [
":api_version_lib",
":pausable_ack_queue_lib",
Expand Down Expand Up @@ -402,6 +410,7 @@ envoy_cc_library(
hdrs = ["watch_map.h"],
deps = [
":decoded_resource_lib",
":utility_lib",
":xds_resource_lib",
"//envoy/config:subscription_interface",
"//source/common/common:assert_lib",
Expand Down
260 changes: 60 additions & 200 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
@@ -1,243 +1,103 @@
#include "source/common/config/delta_subscription_state.h"

#include "envoy/event/dispatcher.h"
#include "envoy/service/discovery/v3/discovery.pb.h"

#include "source/common/common/assert.h"
#include "source/common/common/hash.h"
#include "source/common/config/utility.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace Config {
namespace {

DeltaSubscriptionStateVariant getState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher) {
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.explicit_wildcard_resource")) {
return DeltaSubscriptionStateVariant(absl::in_place_type<NewDeltaSubscriptionState>,
std::move(type_url), watch_map, local_info, dispatcher);
} else {
return DeltaSubscriptionStateVariant(absl::in_place_type<OldDeltaSubscriptionState>,
std::move(type_url), watch_map, local_info, dispatcher);
}
}

} // namespace

DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher, const bool wildcard)
// TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
// empty resources as updates.
: supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"),
ttl_(
[this](const auto& expired) {
Protobuf::RepeatedPtrField<std::string> removed_resources;
for (const auto& resource : expired) {
setResourceWaitingForServer(resource);
removed_resources.Add(std::string(resource));
}

watch_map_.onConfigUpdate({}, removed_resources, "");
},
dispatcher, dispatcher.timeSource()),
type_url_(std::move(type_url)), wildcard_(wildcard), watch_map_(watch_map),
local_info_(local_info), dispatcher_(dispatcher) {}
Event::Dispatcher& dispatcher)
: state_(getState(std::move(type_url), watch_map, local_info, dispatcher)) {}

void DeltaSubscriptionState::updateSubscriptionInterest(
const absl::flat_hash_set<std::string>& cur_added,
const absl::flat_hash_set<std::string>& cur_removed) {
for (const auto& a : cur_added) {
setResourceWaitingForServer(a);
// If interest in a resource is removed-then-added (all before a discovery request
// can be sent), we must treat it as a "new" addition: our user may have forgotten its
// copy of the resource after instructing us to remove it, and need to be reminded of it.
names_removed_.erase(a);
names_added_.insert(a);
}
for (const auto& r : cur_removed) {
removeResourceState(r);
// Ideally, when interest in a resource is added-then-removed in between requests,
// we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
// in the request. However, the removed-then-added case *does* need to go in the request,
// and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
// add-remove (because "remove-add" has to be treated as equivalent to just "add").
names_added_.erase(r);
names_removed_.insert(r);
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
state->updateSubscriptionInterest(cur_added, cur_removed);
return;
}
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
state.updateSubscriptionInterest(cur_added, cur_removed);
}

// Not having sent any requests yet counts as an "update pending" since you're supposed to resend
// the entirety of your interest at the start of a stream, even if nothing has changed.
bool DeltaSubscriptionState::subscriptionUpdatePending() const {
return !names_added_.empty() || !names_removed_.empty() ||
!any_request_sent_yet_in_current_stream_ || must_send_discovery_request_;
void DeltaSubscriptionState::setMustSendDiscoveryRequest() {
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
state->setMustSendDiscoveryRequest();
return;
}
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
state.setMustSendDiscoveryRequest();
}

UpdateAck DeltaSubscriptionState::handleResponse(
const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
// We *always* copy the response's nonce into the next request, even if we're going to make that
// request a NACK by setting error_detail.
UpdateAck ack(message.nonce(), type_url_);
TRY_ASSERT_MAIN_THREAD { handleGoodResponse(message); }
END_TRY
catch (const EnvoyException& e) {
handleBadResponse(e, ack);
bool DeltaSubscriptionState::subscriptionUpdatePending() const {
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
return state->subscriptionUpdatePending();
}
return ack;
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
return state.subscriptionUpdatePending();
}

bool DeltaSubscriptionState::isHeartbeatResponse(
const envoy::service::discovery::v3::Resource& resource) const {
if (!supports_heartbeats_ &&
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.vhds_heartbeats")) {
return false;
}
const auto itr = resource_state_.find(resource.name());
if (itr == resource_state_.end()) {
return false;
void DeltaSubscriptionState::markStreamFresh() {
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
state->markStreamFresh();
return;
}

return !resource.has_resource() && !itr->second.waitingForServer() &&
resource.version() == itr->second.version();
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
state.markStreamFresh();
}

void DeltaSubscriptionState::handleGoodResponse(
UpdateAck DeltaSubscriptionState::handleResponse(
const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
absl::flat_hash_set<std::string> names_added_removed;
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> non_heartbeat_resources;
for (const auto& resource : message.resources()) {
if (!names_added_removed.insert(resource.name()).second) {
throw EnvoyException(
fmt::format("duplicate name {} found among added/updated resources", resource.name()));
}
if (isHeartbeatResponse(resource)) {
continue;
}
non_heartbeat_resources.Add()->CopyFrom(resource);
// DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
if (!resource.has_resource() && resource.aliases_size() > 0) {
continue;
}
if (message.type_url() != resource.resource().type_url()) {
throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
"the message-wide type URL {} in DeltaDiscoveryResponse {}",
resource.resource().type_url(), message.type_url(),
message.DebugString()));
}
}
for (const auto& name : message.removed_resources()) {
if (!names_added_removed.insert(name).second) {
throw EnvoyException(
fmt::format("duplicate name {} found in the union of added+removed resources", name));
}
}

{
const auto scoped_update = ttl_.scopedTtlUpdate();
for (const auto& resource : message.resources()) {
if (wildcard_ || resource_state_.contains(resource.name())) {
// Only consider tracked resources.
// NOTE: This is not gonna work for xdstp resources with glob resource matching.
addResourceState(resource);
}
}
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
return state->handleResponse(message);
}

watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
message.system_version_info());

// If a resource is gone, there is no longer a meaningful version for it that makes sense to
// provide to the server upon stream reconnect: either it will continue to not exist, in which
// case saying nothing is fine, or the server will bring back something new, which we should
// receive regardless (which is the logic that not specifying a version will get you).
//
// So, leave the version map entry present but blank. It will be left out of
// initial_resource_versions messages, but will remind us to explicitly tell the server "I'm
// cancelling my subscription" when we lose interest.
for (const auto& resource_name : message.removed_resources()) {
if (resource_names_.find(resource_name) != resource_names_.end()) {
setResourceWaitingForServer(resource_name);
}
}
ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_,
message.resources().size(), message.removed_resources().size());
}

void DeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAck& ack) {
// Note that error_detail being set is what indicates that a DeltaDiscoveryRequest is a NACK.
ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what());
watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
return state.handleResponse(message);
}

void DeltaSubscriptionState::handleEstablishmentFailure() {
watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
nullptr);
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
state->handleEstablishmentFailure();
return;
}
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
state.handleEstablishmentFailure();
}

envoy::service::discovery::v3::DeltaDiscoveryRequest
DeltaSubscriptionState::getNextRequestAckless() {
envoy::service::discovery::v3::DeltaDiscoveryRequest request;
must_send_discovery_request_ = false;
if (!any_request_sent_yet_in_current_stream_) {
any_request_sent_yet_in_current_stream_ = true;
// initial_resource_versions "must be populated for first request in a stream".
// Also, since this might be a new server, we must explicitly state *all* of our subscription
// interest.
for (auto const& [resource_name, resource_state] : resource_state_) {
// Populate initial_resource_versions with the resource versions we currently have.
// Resources we are interested in, but are still waiting to get any version of from the
// server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
if (!resource_state.waitingForServer()) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
}
// As mentioned above, fill resource_names_subscribe with everything, including names we
// have yet to receive any resource for unless this is a wildcard subscription, for which
// the first request on a stream must be without any resource names.
if (!wildcard_) {
names_added_.insert(resource_name);
}
}
// Wildcard subscription initial requests must have no resource_names_subscribe.
if (wildcard_) {
names_added_.clear();
}
names_removed_.clear();
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
return state->getNextRequestAckless();
}
std::copy(names_added_.begin(), names_added_.end(),
Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_subscribe()));
std::copy(names_removed_.begin(), names_removed_.end(),
Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_unsubscribe()));
names_added_.clear();
names_removed_.clear();

request.set_type_url(type_url_);
request.mutable_node()->MergeFrom(local_info_.node());
return request;
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
return state.getNextRequestAckless();
}

envoy::service::discovery::v3::DeltaDiscoveryRequest
DeltaSubscriptionState::getNextRequestWithAck(const UpdateAck& ack) {
envoy::service::discovery::v3::DeltaDiscoveryRequest request = getNextRequestAckless();
request.set_response_nonce(ack.nonce_);
if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
// Don't needlessly make the field present-but-empty if status is ok.
request.mutable_error_detail()->CopyFrom(ack.error_detail_);
if (auto* state = absl::get_if<OldDeltaSubscriptionState>(&state_); state != nullptr) {
return state->getNextRequestWithAck(ack);
}
return request;
}

void DeltaSubscriptionState::addResourceState(
const envoy::service::discovery::v3::Resource& resource) {
if (resource.has_ttl()) {
ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
resource.name());
} else {
ttl_.clear(resource.name());
}

resource_state_[resource.name()] = ResourceState(resource);
resource_names_.insert(resource.name());
}

void DeltaSubscriptionState::setResourceWaitingForServer(const std::string& resource_name) {
resource_state_[resource_name] = ResourceState();
resource_names_.insert(resource_name);
}

void DeltaSubscriptionState::removeResourceState(const std::string& resource_name) {
resource_state_.erase(resource_name);
resource_names_.erase(resource_name);
auto& state = absl::get<NewDeltaSubscriptionState>(state_);
return state.getNextRequestWithAck(ack);
}

} // namespace Config
Expand Down
Loading

0 comments on commit 4fc3612

Please sign in to comment.