From b1e62a3f3453ee79f29a8f7a545786f7e419fc11 Mon Sep 17 00:00:00 2001 From: Phil Genera Date: Thu, 9 Jul 2020 14:48:06 -0400 Subject: [PATCH] eds: decrease computational complexity of updates (#11442) Makes BaseDynamicClusterImpl::updateDynamicHostList O(n) rather than O(n^2) Instead of calling .erase() on list iterators as we find them, we swap with the end of the list and erase after iterating over the list. This shows a ~3x improvement in execution time in the included benchmark test. Risk Level: Medium. No reordering happens to the endpoint list. Not runtime guarded. Testing: New benchmark, existing unit tests pass (and cover the affected function). Docs Changes: N/A Release Notes: N/A Relates to #2874 #11362 Signed-off-by: Phil Genera --- bazel/test_for_benchmark_wrapper.sh | 6 +- source/common/upstream/upstream_impl.cc | 58 +++++++------ test/benchmark/BUILD | 2 + test/benchmark/main.cc | 32 ++++++- test/benchmark/main.h | 13 +++ test/common/upstream/eds_speed_test.cc | 111 ++++++++++++++++-------- 6 files changed, 155 insertions(+), 67 deletions(-) create mode 100644 test/benchmark/main.h diff --git a/bazel/test_for_benchmark_wrapper.sh b/bazel/test_for_benchmark_wrapper.sh index 7c1dc7a1def6..37de6d0d0d81 100755 --- a/bazel/test_for_benchmark_wrapper.sh +++ b/bazel/test_for_benchmark_wrapper.sh @@ -1,4 +1,6 @@ #!/bin/bash -# Set the benchmark time to 0 to just verify that the benchmark runs to completion. -"${TEST_SRCDIR}/envoy/$@" --benchmark_min_time=0 +# Set the benchmark time to 0 to just verify that the benchmark runs to +# completion. We're interacting with two different flag parsers, so the order +# of flags and the -- matters. +"${TEST_SRCDIR}/envoy/$@" --skip_expensive_benchmarks -- --benchmark_min_time=0 diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 7967a7d1ba96..97776daa4901 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -1332,9 +1332,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, bool hosts_changed = false; // Go through and see if the list we have is different from what we just got. If it is, we make a - // new host list and raise a change notification. This uses an N^2 search given that this does not - // happen very often and the list sizes should be small (see - // https://github.com/envoyproxy/envoy/issues/2874). We also check for duplicates here. It's + // new host list and raise a change notification. We also check for duplicates here. It's // possible for DNS to return the same address multiple times, and a bad EDS implementation could // do the same thing. @@ -1437,16 +1435,20 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, // Remove hosts from current_priority_hosts that were matched to an existing host in the previous // loop. - for (auto itr = current_priority_hosts.begin(); itr != current_priority_hosts.end();) { - auto existing_itr = existing_hosts_for_current_priority.find((*itr)->address()->asString()); + auto erase_from = + std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(), + [&existing_hosts_for_current_priority](const HostSharedPtr& p) { + auto existing_itr = + existing_hosts_for_current_priority.find(p->address()->asString()); - if (existing_itr != existing_hosts_for_current_priority.end()) { - existing_hosts_for_current_priority.erase(existing_itr); - itr = current_priority_hosts.erase(itr); - } else { - itr++; - } - } + if (existing_itr != existing_hosts_for_current_priority.end()) { + existing_hosts_for_current_priority.erase(existing_itr); + return true; + } + + return false; + }); + current_priority_hosts.erase(erase_from, current_priority_hosts.end()); // If we saw existing hosts during this iteration from a different priority, then we've moved // a host from another priority into this one, so we should mark the priority as having changed. @@ -1464,21 +1466,23 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, const bool dont_remove_healthy_hosts = health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval(); if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) { - for (auto i = current_priority_hosts.begin(); i != current_priority_hosts.end();) { - if (!((*i)->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) || - (*i)->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) { - if ((*i)->weight() > max_host_weight) { - max_host_weight = (*i)->weight(); - } - - final_hosts.push_back(*i); - updated_hosts[(*i)->address()->asString()] = *i; - (*i)->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); - i = current_priority_hosts.erase(i); - } else { - i++; - } - } + erase_from = + std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(), + [&updated_hosts, &final_hosts, &max_host_weight](const HostSharedPtr& p) { + if (!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) || + p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) { + if (p->weight() > max_host_weight) { + max_host_weight = p->weight(); + } + + final_hosts.push_back(p); + updated_hosts[p->address()->asString()] = p; + p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); + return true; + } + return false; + }); + current_priority_hosts.erase(erase_from, current_priority_hosts.end()); } // At this point we've accounted for all the new hosts as well the hosts that previously diff --git a/test/benchmark/BUILD b/test/benchmark/BUILD index afba86c9dd22..fa01e3b1ce63 100644 --- a/test/benchmark/BUILD +++ b/test/benchmark/BUILD @@ -11,8 +11,10 @@ envoy_package() envoy_cc_test_library( name = "main", srcs = ["main.cc"], + hdrs = ["main.h"], external_deps = [ "benchmark", + "tclap", ], deps = [ "//test/test_common:environment_lib", diff --git a/test/benchmark/main.cc b/test/benchmark/main.cc index 7afdf85e6558..6c23c1031a6c 100644 --- a/test/benchmark/main.cc +++ b/test/benchmark/main.cc @@ -1,16 +1,40 @@ // NOLINT(namespace-envoy) // This is an Envoy driver for benchmarks. +#include "test/benchmark/main.h" + #include "test/test_common/environment.h" #include "benchmark/benchmark.h" +#include "tclap/CmdLine.h" + +static bool skip_expensive_benchmarks = false; -// Boilerplate main(), which discovers benchmarks and runs them. +// Boilerplate main(), which discovers benchmarks and runs them. This uses two +// different flag parsers, so the order of flags matters: flags defined here +// must be passed first, and flags defined in benchmark::Initialize second, +// separated by --. +// TODO(pgenera): convert this to abseil/flags/ when benchmark also adopts abseil. int main(int argc, char** argv) { Envoy::TestEnvironment::initializeTestMain(argv[0]); - benchmark::Initialize(&argc, argv); - if (benchmark::ReportUnrecognizedArguments(argc, argv)) { - return 1; + // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall) + TCLAP::CmdLine cmd("envoy-benchmark-test", ' ', "0.1"); + TCLAP::SwitchArg skip_switch("s", "skip_expensive_benchmarks", + "skip or minimize expensive benchmarks", cmd, false); + + cmd.setExceptionHandling(false); + try { + cmd.parse(argc, argv); + } catch (const TCLAP::ExitException& e) { + // parse() throws an ExitException with status 0 after printing the output + // for --help and --version. + return 0; } + + skip_expensive_benchmarks = skip_switch.getValue(); + + benchmark::Initialize(&argc, argv); benchmark::RunSpecifiedBenchmarks(); } + +bool Envoy::benchmark::skipExpensiveBenchmarks() { return skip_expensive_benchmarks; } diff --git a/test/benchmark/main.h b/test/benchmark/main.h new file mode 100644 index 000000000000..efb6797a74ef --- /dev/null +++ b/test/benchmark/main.h @@ -0,0 +1,13 @@ +#pragma once + +/** + * Benchmarks can use this to skip or hurry through long-running tests in CI. + */ + +namespace Envoy { +namespace benchmark { + +bool skipExpensiveBenchmarks(); + +} +} // namespace Envoy diff --git a/test/common/upstream/eds_speed_test.cc b/test/common/upstream/eds_speed_test.cc index a03af737483e..c227dfe4f39d 100644 --- a/test/common/upstream/eds_speed_test.cc +++ b/test/common/upstream/eds_speed_test.cc @@ -16,6 +16,7 @@ #include "server/transport_socket_config_impl.h" +#include "test/benchmark/main.h" #include "test/common/upstream/utility.h" #include "test/mocks/local_info/mocks.h" #include "test/mocks/protobuf/mocks.h" @@ -28,12 +29,15 @@ #include "benchmark/benchmark.h" +using ::benchmark::State; +using Envoy::benchmark::skipExpensiveBenchmarks; + namespace Envoy { namespace Upstream { class EdsSpeedTest { public: - EdsSpeedTest(benchmark::State& state, bool v2_config) + EdsSpeedTest(State& state, bool v2_config) : state_(state), v2_config_(v2_config), type_url_(v2_config_ ? "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" @@ -44,7 +48,26 @@ class EdsSpeedTest { local_info_, std::unique_ptr(async_client_), dispatcher_, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"), - envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, {}, true)) {} + envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, {}, true)) { + resetCluster(R"EOF( + name: name + connect_timeout: 0.25s + type: EDS + eds_cluster_config: + service_name: fare + eds_config: + api_config_source: + cluster_names: + - eds + refresh_delay: 1s + )EOF", + Envoy::Upstream::Cluster::InitializePhase::Secondary); + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_)); + cluster_->initialize([this] { initialized_ = true; }); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_)); + subscription_->start({"fare"}); + } void resetCluster(const std::string& yaml_config, Cluster::InitializePhase initialize_phase) { local_info_.node_.mutable_locality()->set_zone("us-east-1a"); @@ -64,30 +87,14 @@ class EdsSpeedTest { std::chrono::milliseconds(), false); } - void initialize() { - EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_)); - cluster_->initialize([this] { initialized_ = true; }); - } - // Set up an EDS config with multiple priorities, localities, weights and make sure - // they are loaded and reloaded as expected. - void priorityAndLocalityWeightedHelper(bool ignore_unknown_dynamic_fields, size_t num_hosts) { + // they are loaded as expected. + void priorityAndLocalityWeightedHelper(bool ignore_unknown_dynamic_fields, size_t num_hosts, + bool healthy) { state_.PauseTiming(); + envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; cluster_load_assignment.set_cluster_name("fare"); - resetCluster(R"EOF( - name: name - connect_timeout: 0.25s - type: EDS - eds_cluster_config: - service_name: fare - eds_config: - api_config_source: - cluster_names: - - eds - refresh_delay: 1s - )EOF", - Envoy::Upstream::Cluster::InitializePhase::Secondary); // Add a whole bunch of hosts in a single place: auto* endpoints = cluster_load_assignment.add_endpoints(); @@ -100,10 +107,14 @@ class EdsSpeedTest { uint32_t port = 1000; for (size_t i = 0; i < num_hosts; ++i) { - auto* socket_address = endpoints->add_lb_endpoints() - ->mutable_endpoint() - ->mutable_address() - ->mutable_socket_address(); + auto* lb_endpoint = endpoints->add_lb_endpoints(); + if (healthy) { + lb_endpoint->set_health_status(envoy::config::core::v3::HEALTHY); + } else { + lb_endpoint->set_health_status(envoy::config::core::v3::UNHEALTHY); + } + auto* socket_address = + lb_endpoint->mutable_endpoint()->mutable_address()->mutable_socket_address(); socket_address->set_address("10.0.1." + std::to_string(i / 60000)); socket_address->set_port_value((port + i) % 60000); } @@ -111,7 +122,6 @@ class EdsSpeedTest { // this is what we're actually testing: validation_visitor_.setSkipValidation(ignore_unknown_dynamic_fields); - initialize(); auto response = std::make_unique(); response->set_type_url(type_url_); auto* resource = response->mutable_resources()->Add(); @@ -122,16 +132,13 @@ class EdsSpeedTest { ""); resource->set_type_url("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"); } - EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_)); - subscription_->start({"fare"}); state_.ResumeTiming(); grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); - ASSERT(initialized_); ASSERT(cluster_->prioritySet().hostSetsPerPriority()[1]->hostsPerLocality().get()[0].size() == num_hosts); } - benchmark::State& state_; + State& state_; const bool v2_config_; const std::string type_url_; bool initialized_{}; @@ -162,14 +169,50 @@ class EdsSpeedTest { } // namespace Upstream } // namespace Envoy -static void priorityAndLocalityWeighted(benchmark::State& state) { +static void priorityAndLocalityWeighted(State& state) { Envoy::Thread::MutexBasicLockable lock; Envoy::Logger::Context logging_state(spdlog::level::warn, Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false); for (auto _ : state) { Envoy::Upstream::EdsSpeedTest speed_test(state, state.range(0)); - speed_test.priorityAndLocalityWeightedHelper(state.range(1), state.range(2)); + // if we've been instructed to skip tests, only run once no matter the argument: + uint32_t endpoints = skipExpensiveBenchmarks() ? 1 : state.range(2); + + speed_test.priorityAndLocalityWeightedHelper(state.range(1), endpoints, true); + } +} + +BENCHMARK(priorityAndLocalityWeighted) + ->Ranges({{false, true}, {false, true}, {1, 100000}}) + ->Unit(benchmark::kMillisecond); + +static void duplicateUpdate(State& state) { + Envoy::Thread::MutexBasicLockable lock; + Envoy::Logger::Context logging_state(spdlog::level::warn, + Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false); + + for (auto _ : state) { + Envoy::Upstream::EdsSpeedTest speed_test(state, false); + uint32_t endpoints = skipExpensiveBenchmarks() ? 1 : state.range(0); + + speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true); + speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true); + } +} + +BENCHMARK(duplicateUpdate)->Range(1, 100000)->Unit(benchmark::kMillisecond); + +static void healthOnlyUpdate(State& state) { + Envoy::Thread::MutexBasicLockable lock; + Envoy::Logger::Context logging_state(spdlog::level::warn, + Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false); + for (auto _ : state) { + Envoy::Upstream::EdsSpeedTest speed_test(state, false); + uint32_t endpoints = skipExpensiveBenchmarks() ? 1 : state.range(0); + + speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true); + speed_test.priorityAndLocalityWeightedHelper(true, endpoints, false); } } -BENCHMARK(priorityAndLocalityWeighted)->Ranges({{false, true}, {false, true}, {2000, 100000}}); +BENCHMARK(healthOnlyUpdate)->Range(1, 100000)->Unit(benchmark::kMillisecond);