Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdstp: CDS/RDS/SRDS/EDS support added. #15593

Merged
merged 5 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,10 @@ message ScopedRds {
// Configuration source specifier for scoped RDS.
config.core.v3.ConfigSource scoped_rds_config_source = 1
[(validate.rules).message = {required: true}];

// xdstp:// resource locator for scoped RDS collection.
// [#not-implemented-hide:]
string srds_resources_locator = 2;
}

// [#next-free-field: 7]
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,15 @@ class GrpcMux {
* @param callbacks the callbacks to be notified of configuration updates. These must be valid
* until GrpcMuxWatch is destroyed.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @param use_namespace_matching if namespace watch should be created. This is used for creating
* watches on collections of resources; individual members of a collection are identified by the
* namespace in resource name.
* @param options subscription options.
* @return GrpcMuxWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes
* away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr.
*/
virtual GrpcMuxWatchPtr addWatch(const std::string& type_url,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) PURE;
const SubscriptionOptions& options) PURE;

virtual void requestOnDemandUpdate(const std::string& type_url,
const absl::flat_hash_set<std::string>& for_update) PURE;
Expand Down
15 changes: 15 additions & 0 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,21 @@ class SubscriptionCallbacks {
virtual void onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) PURE;
};

/**
* Options associated with a Subscription.
*/
struct SubscriptionOptions {
/**
* For legacy VHDS, should an xDS resource name be treated as <namespace>/<resource name>?
*/
bool use_namespace_matching_{};

/**
* For xdstp:// resource names, should node context parameters be added at the transport layer?
*/
bool add_xdstp_node_context_params_{};
};

/**
* Invoked when raw config received from xDS wire.
*/
Expand Down
4 changes: 2 additions & 2 deletions include/envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ class SubscriptionFactory {
* @param callbacks the callbacks needed by all Subscription objects, to deliver config updates.
* The callbacks must not result in the deletion of the Subscription object.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @param use_namespace_matching whether to use namespace match semantics on subscription.
* @param options subscription options.
*
* @return SubscriptionPtr subscription object corresponding for config and type_url.
*/
virtual SubscriptionPtr subscriptionFromConfigSource(
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, bool use_namespace_matching) PURE;
OpaqueResourceDecoder& resource_decoder, const SubscriptionOptions& options) PURE;

/**
* Collection subscription factory interface for xDS-TP URLs.
Expand Down
1 change: 1 addition & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ class ClusterManagerFactory {
* Create a CDS API provider from configuration proto.
*/
virtual CdsApiPtr createCds(const envoy::config::core::v3::ConfigSource& cds_config,
const xds::core::v3::ResourceLocator* cds_resources_locator,
ClusterManager& cm) PURE;

/**
Expand Down
3 changes: 2 additions & 1 deletion source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, const bool) {
OpaqueResourceDecoder& resource_decoder,
const SubscriptionOptions&) {
auto watch =
std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url, *this);
ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);
Expand Down
12 changes: 7 additions & 5 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class GrpcMuxImpl : public GrpcMux,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching = false) override;
const SubscriptionOptions& options) override;

void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Expand Down Expand Up @@ -169,9 +169,10 @@ class GrpcMuxImpl : public GrpcMux,
// Envoy's dependency ordering.
std::list<std::string> subscriptions_;

// A queue to store requests while rate limited. Note that when requests cannot be sent due to the
// gRPC stream being down, this queue does not store them; rather, they are simply dropped.
// This string is a type URL.
// A queue to store requests while rate limited. Note that when requests
// cannot be sent due to the gRPC stream being down, this queue does not
// store them; rather, they are simply dropped. This string is a type
// URL.
std::unique_ptr<std::queue<std::string>> request_queue_;
const envoy::config::core::v3::ApiVersion transport_api_version_;

Expand All @@ -195,7 +196,8 @@ class NullGrpcMuxImpl : public GrpcMux,
}

GrpcMuxWatchPtr addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
SubscriptionCallbacks&, OpaqueResourceDecoder&, const bool) override {
SubscriptionCallbacks&, OpaqueResourceDecoder&,
const SubscriptionOptions&) override {
ExceptionUtil::throwEnvoyException("ADS must be configured to support an ADS config source");
}

Expand Down
13 changes: 6 additions & 7 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ GrpcSubscriptionImpl::GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux,
SubscriptionStats stats, absl::string_view type_url,
Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout,
bool is_aggregated, bool use_namespace_matching)
bool is_aggregated, const SubscriptionOptions& options)
: grpc_mux_(grpc_mux), callbacks_(callbacks), resource_decoder_(resource_decoder),
stats_(stats), type_url_(type_url), dispatcher_(dispatcher),
init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated),
use_namespace_matching_(use_namespace_matching) {}
init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated), options_(options) {}

// Config::Subscription
void GrpcSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resources) {
Expand All @@ -37,8 +36,7 @@ void GrpcSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resourc
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

watch_ =
grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, use_namespace_matching_);
watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, options_);

// The attempt stat here is maintained for the purposes of having consistency between ADS and
// gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
Expand Down Expand Up @@ -147,11 +145,12 @@ GrpcCollectionSubscriptionImpl::GrpcCollectionSubscriptionImpl(
const xds::core::v3::ResourceLocator& collection_locator, GrpcMuxSharedPtr grpc_mux,
SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder,
SubscriptionStats stats, Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated)
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
const SubscriptionOptions& options)
: GrpcSubscriptionImpl(
grpc_mux, callbacks, resource_decoder, stats,
TypeUtil::descriptorFullNameToTypeUrl(collection_locator.resource_type()), dispatcher,
init_fetch_timeout, is_aggregated, false),
init_fetch_timeout, is_aggregated, options),
collection_locator_(collection_locator) {}

void GrpcCollectionSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
Expand Down
7 changes: 4 additions & 3 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class GrpcSubscriptionImpl : public Subscription,
OpaqueResourceDecoder& resource_decoder, SubscriptionStats stats,
absl::string_view type_url, Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
bool use_namespace_matching);
const SubscriptionOptions& options);

// Config::Subscription
void start(const absl::flat_hash_set<std::string>& resource_names) override;
Expand Down Expand Up @@ -59,7 +59,7 @@ class GrpcSubscriptionImpl : public Subscription,
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
const bool is_aggregated_;
const bool use_namespace_matching_;
const SubscriptionOptions options_;

struct ResourceNameFormatter {
void operator()(std::string* out, const Config::DecodedResourceRef& resource) {
Expand All @@ -77,7 +77,8 @@ class GrpcCollectionSubscriptionImpl : public GrpcSubscriptionImpl {
GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, SubscriptionStats stats,
Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated);
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
const SubscriptionOptions& options);

void start(const absl::flat_hash_set<std::string>& resource_names) override;

Expand Down
49 changes: 22 additions & 27 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,58 +145,53 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) {
const SubscriptionOptions& options) {
auto entry = subscriptions_.find(type_url);
if (entry == subscriptions_.end()) {
// We don't yet have a subscription for type_url! Make one!
if (enable_type_url_downgrade_and_upgrade_) {
registerVersionedTypeUrl(type_url);
}
addSubscription(type_url, use_namespace_matching);
return addWatch(type_url, resources, callbacks, resource_decoder, use_namespace_matching);
addSubscription(type_url, options.use_namespace_matching_);
return addWatch(type_url, resources, callbacks, resource_decoder, options);
}

Watch* watch = entry->second->watch_map_.addWatch(callbacks, resource_decoder);
// updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
updateWatch(type_url, watch, resources, use_namespace_matching);
return std::make_unique<WatchImpl>(type_url, watch, *this);
updateWatch(type_url, watch, resources, options);
return std::make_unique<WatchImpl>(type_url, watch, *this, options);
}

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
const absl::flat_hash_set<std::string>& resources,
const bool creating_namespace_watch) {
const SubscriptionOptions& options) {
ASSERT(watch != nullptr);
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
// If this is a glob collection subscription, we need to compute actual context parameters.
absl::flat_hash_set<std::string> xdstp_resources;
// TODO(htuch): add support for resources beyond glob collections, the constraints below around
// resource size and ID reflect the progress of the xdstp:// implementation.
if (!resources.empty() && XdsResourceIdentifier::hasXdsTpScheme(*resources.begin())) {
// Callers must be asking for a single resource, the collection.
ASSERT(resources.size() == 1);
auto resource = XdsResourceIdentifier::decodeUrn(*resources.begin());
// We only know how to deal with glob collections and static context parameters right now.
// TODO(htuch): add support for dynamic context params and list collections in the future.
if (absl::EndsWith(resource.id(), "/*")) {
auto encoded_context = XdsContextParams::encodeResource(
local_info_.contextProvider().nodeContext(), resource.context(), {}, {});
resource.mutable_context()->CopyFrom(encoded_context);
// We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra
// context parameters.
absl::flat_hash_set<std::string> effective_resources;
for (const auto& resource : resources) {
if (XdsResourceIdentifier::hasXdsTpScheme(resource)) {
auto xdstp_resource = XdsResourceIdentifier::decodeUrn(*resources.begin());
htuch marked this conversation as resolved.
Show resolved Hide resolved
if (options.add_xdstp_node_context_params_) {
const auto context = XdsContextParams::encodeResource(
local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
xdstp_resource.mutable_context()->CopyFrom(context);
}
XdsResourceIdentifier::EncodeOptions encode_options;
encode_options.sort_context_params_ = true;
xdstp_resources.insert(XdsResourceIdentifier::encodeUrn(resource, encode_options));
effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options));
} else {
// TODO(htuch): We will handle list collections here in future work.
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
effective_resources.insert(resource);
}
}
auto added_removed = sub->second->watch_map_.updateWatchInterest(
watch, xdstp_resources.empty() ? resources : xdstp_resources);
if (creating_namespace_watch && xdstp_resources.empty()) {
htuch marked this conversation as resolved.
Show resolved Hide resolved
auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, effective_resources);
if (options.use_namespace_matching_) {
// This is to prevent sending out of requests that contain prefixes instead of resource names
sub->second->sub_state_.updateSubscriptionInterest({}, {});
} else {
Expand All @@ -222,7 +217,7 @@ void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
}

void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
updateWatch(type_url, watch, {});
updateWatch(type_url, watch, {}, {});
auto entry = subscriptions_.find(type_url);
ASSERT(entry != subscriptions_.end(),
fmt::format("removeWatch() called for non-existent subscription {}.", type_url));
Expand Down
14 changes: 8 additions & 6 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class NewGrpcMuxImpl
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching = false) override;
const SubscriptionOptions& options) override;

void requestOnDemandUpdate(const std::string& type_url,
const absl::flat_hash_set<std::string>& for_update) override;
Expand Down Expand Up @@ -90,8 +90,9 @@ class NewGrpcMuxImpl
private:
class WatchImpl : public GrpcMuxWatch {
public:
WatchImpl(const std::string& type_url, Watch* watch, NewGrpcMuxImpl& parent)
: type_url_(type_url), watch_(watch), parent_(parent) {}
WatchImpl(const std::string& type_url, Watch* watch, NewGrpcMuxImpl& parent,
const SubscriptionOptions& options)
: type_url_(type_url), watch_(watch), parent_(parent), options_(options) {}

~WatchImpl() override { remove(); }

Expand All @@ -103,13 +104,14 @@ class NewGrpcMuxImpl
}

void update(const absl::flat_hash_set<std::string>& resources) override {
parent_.updateWatch(type_url_, watch_, resources);
parent_.updateWatch(type_url_, watch_, resources, options_);
}

private:
const std::string type_url_;
Watch* watch_;
NewGrpcMuxImpl& parent_;
const SubscriptionOptions options_;
};

void removeWatch(const std::string& type_url, Watch* watch);
Expand All @@ -119,9 +121,9 @@ class NewGrpcMuxImpl
// subscription will enqueue and attempt to send an appropriate discovery request.
void updateWatch(const std::string& type_url, Watch* watch,
const absl::flat_hash_set<std::string>& resources,
bool creating_namespace_watch = false);
const SubscriptionOptions& options);

void addSubscription(const std::string& type_url, const bool use_namespace_matching);
void addSubscription(const std::string& type_url, bool use_namespace_matching);

void trySendDiscoveryRequests();

Expand Down
Loading