diff --git a/test/integration/BUILD b/test/integration/BUILD index 7a69c2d66c86..1a3054e32fb9 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -1550,11 +1550,16 @@ envoy_cc_test( "//source/extensions/transport_sockets/tls:config", "//source/extensions/transport_sockets/tls:context_config_lib", "//source/extensions/transport_sockets/tls:context_lib", + "//test/common/grpc:grpc_client_integration_lib", "//test/integration/filters:test_listener_filter_lib", "//test/test_common:environment_lib", + "//test/test_common:status_utility_lib", "//test/test_common:utility_lib", + "@envoy_api//envoy/admin/v3:pkg_cc_proto", "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", + "@envoy_api//envoy/service/runtime/v3:pkg_cc_proto", + "@envoy_api//envoy/service/secret/v3:pkg_cc_proto", ], ) diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index cd92eed6326b..e2f57498bdd9 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -55,6 +55,46 @@ TEST_P(AdsIntegrationTest, BasicClusterInitialWarming) { test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); } +// Tests that the Envoy xDS client can handle updates to a subset of the subscribed resources from +// an xDS server without removing the resources not included in the DiscoveryResponse from the xDS +// server. +TEST_P(AdsIntegrationTest, UpdateToSubsetOfResources) { + initialize(); + registerTestServerPorts({}); + const auto cds_type_url = Config::getTypeUrl(); + const auto eds_type_url = + Config::getTypeUrl(); + + auto cluster_0 = buildCluster("cluster_0"); + auto cluster_1 = buildCluster("cluster_1"); + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse(cds_type_url, {cluster_0, cluster_1}, + {cluster_0, cluster_1}, {}, "1"); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {cluster_0.name(), cluster_1.name()}, + {cluster_0.name(), cluster_1.name()}, {})); + auto cla_0 = buildClusterLoadAssignment(cluster_0.name()); + auto cla_1 = buildClusterLoadAssignment(cluster_1.name()); + sendDiscoveryResponse( + eds_type_url, {cla_0, cla_1}, {cla_0, cla_1}, {}, "1"); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); + + // Send an update for one of the ClusterLoadAssignments only. + cla_0.mutable_endpoints(0)->mutable_lb_endpoints(0)->mutable_load_balancing_weight()->set_value( + 2); + sendDiscoveryResponse(eds_type_url, {cla_0}, + {cla_0}, {}, "2"); + + // Verify that getting an update for only one of the ClusterLoadAssignment resources does not + // delete the other. We use cluster membership health as a proxy for this. + test_server_->waitForCounterEq("cluster.cluster_0.update_success", 2); + test_server_->waitForCounterEq("cluster.cluster_1.update_success", 1); + test_server_->waitForGaugeEq("cluster.cluster_0.membership_healthy", 1); + test_server_->waitForGaugeEq("cluster.cluster_1.membership_healthy", 1); +} + // Update the only warming cluster. Verify that the new cluster is still warming and the cluster // manager as a whole is not initialized. TEST_P(AdsIntegrationTest, ClusterInitializationUpdateTheOnlyWarmingCluster) { diff --git a/test/integration/base_integration_test.cc b/test/integration/base_integration_test.cc index 90e9e96746a4..e1c9dc9a876c 100644 --- a/test/integration/base_integration_test.cc +++ b/test/integration/base_integration_test.cc @@ -578,9 +578,14 @@ AssertionResult compareSets(const std::set& set1, const std::set& expected_resource_names, bool expect_node, - const Protobuf::int32 expected_error_code, const std::string& expected_error_substring) { + const Protobuf::int32 expected_error_code, const std::string& expected_error_substring, + FakeStream* stream) { + if (stream == nullptr) { + stream = xds_stream_.get(); + } + envoy::service::discovery::v3::DiscoveryRequest discovery_request; - VERIFY_ASSERTION(xds_stream_->waitForGrpcMessage(*dispatcher_, discovery_request)); + VERIFY_ASSERTION(stream->waitForGrpcMessage(*dispatcher_, discovery_request)); if (expect_node) { EXPECT_TRUE(discovery_request.has_node()); diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index e3954e7f0e61..3c1a33c5fbee 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -207,11 +207,15 @@ class BaseIntegrationTest : protected Logger::Loggable { const std::string& expected_type_url, const std::string& expected_version, const std::vector& expected_resource_names, bool expect_node = false, const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& expected_error_message = ""); + const std::string& expected_error_message = "", FakeStream* stream = nullptr); template void sendSotwDiscoveryResponse(const std::string& type_url, const std::vector& messages, - const std::string& version) { + const std::string& version, FakeStream* stream = nullptr) { + if (stream == nullptr) { + stream = xds_stream_.get(); + } + envoy::service::discovery::v3::DiscoveryResponse discovery_response; discovery_response.set_version_info(version); discovery_response.set_type_url(type_url); @@ -220,7 +224,7 @@ class BaseIntegrationTest : protected Logger::Loggable { } static int next_nonce_counter = 0; discovery_response.set_nonce(absl::StrCat("nonce", next_nonce_counter++)); - xds_stream_->sendGrpcMessage(discovery_response); + stream->sendGrpcMessage(discovery_response); } template diff --git a/test/integration/xds_integration_test.cc b/test/integration/xds_integration_test.cc index b571a70851be..8ab1dab2f1e7 100644 --- a/test/integration/xds_integration_test.cc +++ b/test/integration/xds_integration_test.cc @@ -1,13 +1,18 @@ +#include "envoy/admin/v3/config_dump.pb.h" #include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/service/runtime/v3/rtds.pb.h" +#include "envoy/service/secret/v3/sds.pb.h" #include "source/common/buffer/buffer_impl.h" +#include "test/common/grpc/grpc_client_integration.h" #include "test/integration/filters/test_listener_filter.h" #include "test/integration/http_integration.h" #include "test/integration/http_protocol_integration.h" #include "test/integration/ssl_utility.h" #include "test/test_common/environment.h" +#include "test/test_common/status_utility.h" #include "test/test_common/utility.h" #include "gtest/gtest.h" @@ -796,5 +801,215 @@ TEST_P(LdsStsIntegrationTest, TcpListenerRemoveFilterChainCalledAfterListenerIsR // removal at the worker is completed. This is the end of the in place update. test_server_->waitForGaugeEq("listener_manager.total_filter_chains_draining", 0); } + +constexpr char XDS_CLUSTER_NAME_1[] = "xds_cluster_1.lyft.com"; +constexpr char XDS_CLUSTER_NAME_2[] = "xds_cluster_2.lyft.com"; +constexpr char CLIENT_CERT_NAME[] = "client_cert"; + +class XdsSotwMultipleAuthoritiesTest : public HttpIntegrationTest, + public Grpc::GrpcClientIntegrationParamTest { +public: + XdsSotwMultipleAuthoritiesTest() + : HttpIntegrationTest(Http::CodecType::HTTP2, ipVersion(), + ConfigHelper::baseConfigNoListeners()) { + use_lds_ = false; + sotw_or_delta_ = Grpc::SotwOrDelta::Sotw; + skip_tag_extraction_rule_check_ = true; + + // Make the default cluster HTTP2. + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + ConfigHelper::setHttp2(*bootstrap.mutable_static_resources()->mutable_clusters(0)); + }); + + // Add a second cluster. + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* cluster = bootstrap.mutable_static_resources()->add_clusters(); + *cluster = bootstrap.mutable_static_resources()->clusters(0); + cluster->set_name("cluster_1"); + cluster->mutable_load_assignment()->set_cluster_name("cluster_1"); + }); + + // Add two xDS clusters. + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + addXdsCluster(bootstrap, std::string(XDS_CLUSTER_NAME_1)); + addXdsCluster(bootstrap, std::string(XDS_CLUSTER_NAME_2)); + }); + + // Set up the two static clusters with SSL using SDS. + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + setUpClusterSsl(*bootstrap.mutable_static_resources()->mutable_clusters(0), + std::string(XDS_CLUSTER_NAME_1), getXdsUpstream1()); + setUpClusterSsl(*bootstrap.mutable_static_resources()->mutable_clusters(1), + std::string(XDS_CLUSTER_NAME_2), getXdsUpstream2()); + }); + } + + void initialize() override { + HttpIntegrationTest::initialize(); + registerTestServerPorts({}); + } + + void TearDown() override { + closeConnection(xds_connection_1_); + closeConnection(xds_connection_2_); + cleanupUpstreamAndDownstream(); + codec_client_.reset(); + test_server_.reset(); + fake_upstreams_.clear(); + } + + void createUpstreams() override { + // Static cluster. + addFakeUpstream(Http::CodecType::HTTP2); + // Static cluster. + addFakeUpstream(Http::CodecType::HTTP2); + // XDS Cluster. + addFakeUpstream(Http::CodecType::HTTP2); + // XDS Cluster. + addFakeUpstream(Http::CodecType::HTTP2); + } + +protected: + FakeUpstream& getXdsUpstream1() { return *fake_upstreams_[2]; } + FakeUpstream& getXdsUpstream2() { return *fake_upstreams_[3]; } + + void addXdsCluster(envoy::config::bootstrap::v3::Bootstrap& bootstrap, + const std::string& cluster_name) { + auto* xds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + xds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + xds_cluster->set_name(cluster_name); + xds_cluster->mutable_load_assignment()->set_cluster_name(cluster_name); + ConfigHelper::setHttp2(*xds_cluster); + } + + void setUpClusterSsl(envoy::config::cluster::v3::Cluster& cluster, + const std::string& cluster_name, FakeUpstream& cluster_upstream) { + auto* transport_socket = cluster.mutable_transport_socket(); + envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext tls_context; + tls_context.set_sni("lyft.com"); + auto* secret_config = + tls_context.mutable_common_tls_context()->add_tls_certificate_sds_secret_configs(); + setUpSdsConfig(secret_config, CLIENT_CERT_NAME, cluster_name, cluster_upstream); + transport_socket->set_name("envoy.transport_sockets.tls"); + transport_socket->mutable_typed_config()->PackFrom(tls_context); + } + + void initXdsStream(FakeUpstream& upstream, FakeHttpConnectionPtr& connection, + FakeStreamPtr& stream) { + AssertionResult result = upstream.waitForHttpConnection(*dispatcher_, connection); + RELEASE_ASSERT(result, result.message()); + result = connection->waitForNewStream(*dispatcher_, stream); + RELEASE_ASSERT(result, result.message()); + stream->startGrpcStream(); + } + + void closeConnection(FakeHttpConnectionPtr& connection) { + AssertionResult result = connection->close(); + RELEASE_ASSERT(result, result.message()); + result = connection->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + connection.reset(); + } + + void setUpSdsConfig(envoy::extensions::transport_sockets::tls::v3::SdsSecretConfig* secret_config, + const std::string& secret_name, const std::string& cluster_name, + FakeUpstream& cluster_upstream) { + secret_config->set_name(secret_name); + auto* config_source = secret_config->mutable_sds_config(); + config_source->set_resource_api_version(envoy::config::core::v3::ApiVersion::V3); + auto* api_config_source = config_source->mutable_api_config_source(); + api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); + api_config_source->set_transport_api_version(envoy::config::core::v3::V3); + auto* grpc_service = api_config_source->add_grpc_services(); + setGrpcService(*grpc_service, cluster_name, cluster_upstream.localAddress()); + } + + envoy::extensions::transport_sockets::tls::v3::Secret + getClientSecret(const std::string& secret_name) { + envoy::extensions::transport_sockets::tls::v3::Secret secret; + secret.set_name(secret_name); + auto* tls_certificate = secret.mutable_tls_certificate(); + tls_certificate->mutable_certificate_chain()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/clientcert.pem")); + tls_certificate->mutable_private_key()->set_filename( + TestEnvironment::runfilesPath("test/config/integration/certs/clientkey.pem")); + return secret; + } + + envoy::admin::v3::ConfigDump getSecretsFromConfigDump() { + auto response = IntegrationUtil::makeSingleRequest( + lookupPort("admin"), "GET", "/config_dump?resource=dynamic_active_secrets", "", + downstreamProtocol(), version_); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + Json::ObjectSharedPtr loader = TestEnvironment::jsonLoadFromString(response->body()); + envoy::admin::v3::ConfigDump config_dump; + TestUtility::loadFromJson(loader->asJsonString(), config_dump); + return config_dump; + } + + FakeHttpConnectionPtr xds_connection_1_; + FakeStreamPtr xds_stream_1_; + FakeHttpConnectionPtr xds_connection_2_; + FakeStreamPtr xds_stream_2_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, XdsSotwMultipleAuthoritiesTest, + GRPC_CLIENT_INTEGRATION_PARAMS); + +// Verifies that if two different xDS servers send resources of the same name and same type, Envoy +// still treats them as two separate resources. +TEST_P(XdsSotwMultipleAuthoritiesTest, SameResourceNameAndTypeFromMultipleAuthorities) { + const std::string cert_name{CLIENT_CERT_NAME}; + + on_server_init_function_ = [this, &cert_name]() { + { + // SDS for the first cluster. + initXdsStream(getXdsUpstream1(), xds_connection_1_, xds_stream_1_); + EXPECT_TRUE(compareSotwDiscoveryRequest( + /*expected_type_url=*/Config::TypeUrl::get().Secret, + /*expected_version=*/"", + /*expected_resource_names=*/{cert_name}, /*expect_node=*/true, + Grpc::Status::WellKnownGrpcStatus::Ok, + /*expected_error_message=*/"", xds_stream_1_.get())); + auto sds_resource = getClientSecret(cert_name); + sendSotwDiscoveryResponse( + Config::TypeUrl::get().Secret, {sds_resource}, "1", xds_stream_1_.get()); + } + { + // SDS for the second cluster. + initXdsStream(getXdsUpstream2(), xds_connection_2_, xds_stream_2_); + EXPECT_TRUE(compareSotwDiscoveryRequest( + /*expected_type_url=*/Config::TypeUrl::get().Secret, + /*expected_version=*/"", + /*expected_resource_names=*/{cert_name}, /*expect_node=*/true, + Grpc::Status::WellKnownGrpcStatus::Ok, + /*expected_error_message=*/"", xds_stream_2_.get())); + auto sds_resource = getClientSecret(cert_name); + sendSotwDiscoveryResponse( + Config::TypeUrl::get().Secret, {sds_resource}, "1", xds_stream_2_.get()); + } + }; + + initialize(); + + // Wait until the discovery responses have been processed. + test_server_->waitForCounterGe( + "cluster.cluster_0.client_ssl_socket_factory.ssl_context_update_by_sds", 1); + test_server_->waitForCounterGe( + "cluster.cluster_1.client_ssl_socket_factory.ssl_context_update_by_sds", 1); + + auto config_dump = getSecretsFromConfigDump(); + // Two xDS resources with the same name and same type. + ASSERT_EQ(config_dump.configs_size(), 2); + envoy::admin::v3::SecretsConfigDump::DynamicSecret dynamic_secret; + ASSERT_OK(MessageUtil::unpackToNoThrow(config_dump.configs(0), dynamic_secret)); + EXPECT_EQ(cert_name, dynamic_secret.name()); + EXPECT_EQ("1", dynamic_secret.version_info()); + ASSERT_OK(MessageUtil::unpackToNoThrow(config_dump.configs(1), dynamic_secret)); + EXPECT_EQ(cert_name, dynamic_secret.name()); + EXPECT_EQ("1", dynamic_secret.version_info()); +} + } // namespace } // namespace Envoy