Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: adding a multi-envoy test #20016

Merged
merged 9 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions source/common/quic/active_quic_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ ActiveQuicListener::ActiveQuicListener(
ASSERT(GetQuicReloadableFlag(quic_single_ack_in_packet2));
ASSERT(!GetQuicFlag(FLAGS_quic_header_size_limit_includes_overhead));

if (Runtime::LoaderSingleton::getExisting()) {
enabled_.emplace(Runtime::FeatureFlag(enabled, runtime));
}
enabled_.emplace(Runtime::FeatureFlag(enabled, runtime));

quic::QuicRandom* const random = quic::QuicRandom::GetInstance();
random->RandBytes(random_seed_, sizeof(random_seed_));
Expand Down
10 changes: 4 additions & 6 deletions source/common/signal/fatal_error_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,10 @@ void registerFatalActions(FatalAction::FatalActionPtrList safe_actions,
FatalAction::FatalActionPtrList unsafe_actions,
Thread::ThreadFactory& thread_factory) {
// Create a FatalActionManager and store it.
FatalAction::FatalActionManager* previous_manager =
fatal_action_manager.exchange(new FatalAction::FatalActionManager(
std::move(safe_actions), std::move(unsafe_actions), thread_factory));

// Previous manager should be NULL.
ASSERT(!previous_manager);
if (!fatal_action_manager) {
fatal_action_manager.exchange(new FatalAction::FatalActionManager(
std::move(safe_actions), std::move(unsafe_actions), thread_factory));
}
}

FatalAction::Status runSafeActions() { return runFatalActions(FatalActionType::Safe); }
Expand Down
2 changes: 2 additions & 0 deletions source/common/singleton/threadsafe_singleton.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ template <class T> class ScopedInjectableLoader {
}
~ScopedInjectableLoader() { InjectableSingleton<T>::clear(); }

T& instance() { return *instance_; }

private:
std::unique_ptr<T> instance_;
};
Expand Down
2 changes: 1 addition & 1 deletion source/server/config_validation/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ValidationInstance final : Logger::Loggable<Logger::Id::main>,
ServerLifecycleNotifier& lifecycleNotifier() override { return *this; }
ListenerManager& listenerManager() override { return *listener_manager_; }
Secret::SecretManager& secretManager() override { return *secret_manager_; }
Runtime::Loader& runtime() override { return Runtime::LoaderSingleton::get(); }
Runtime::Loader& runtime() override { return runtime_singleton_->instance(); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected this to delegate to InstanceImpl::runtime(). But does that not work because ValidationInstance does not extend InstanceImpl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly. arguably I should do the same restart flag checks here, but I'lll get there in the PR I flip the runtime guard true by default.

void shutdown() override;
bool isShutdown() override { return false; }
void shutdownAdmin() override {}
Expand Down
31 changes: 20 additions & 11 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,12 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add

// Runtime gets initialized before the main configuration since during main configuration
// load things may grab a reference to the loader for later use.
runtime_singleton_ = std::make_unique<Runtime::ScopedLoaderSingleton>(
component_factory.createRuntime(*this, initial_config));
Runtime::LoaderPtr runtime_ptr = component_factory.createRuntime(*this, initial_config);
if (runtime_ptr->snapshot().getBoolean("envoy.restart_features.no_runtime_singleton", false)) {
runtime_ = std::move(runtime_ptr);
} else {
runtime_singleton_ = std::make_unique<Runtime::ScopedLoaderSingleton>(std::move(runtime_ptr));
}
initial_config.initAdminAccessLog(bootstrap_, *this);

if (initial_config.admin().address()) {
Expand All @@ -648,10 +652,10 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add
dns_resolver_factory.createDnsResolver(dispatcher(), api(), typed_dns_resolver_config);

cluster_manager_factory_ = std::make_unique<Upstream::ProdClusterManagerFactory>(
*admin_, Runtime::LoaderSingleton::get(), stats_store_, thread_local_, dns_resolver_,
*ssl_context_manager_, *dispatcher_, *local_info_, *secret_manager_,
messageValidationContext(), *api_, http_context_, grpc_context_, router_context_,
access_log_manager_, *singleton_manager_, options_, quic_stat_names_);
*admin_, runtime(), stats_store_, thread_local_, dns_resolver_, *ssl_context_manager_,
*dispatcher_, *local_info_, *secret_manager_, messageValidationContext(), *api_,
http_context_, grpc_context_, router_context_, access_log_manager_, *singleton_manager_,
options_, quic_stat_names_);

// Now the configuration gets parsed. The configuration may start setting
// thread local data per above. See MainImpl::initialize() for why ConfigImpl
Expand All @@ -675,7 +679,7 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add

// We have to defer RTDS initialization until after the cluster manager is
// instantiated (which in turn relies on runtime...).
Runtime::LoaderSingleton::get().initialize(clusterManager());
runtime().initialize(clusterManager());

clusterManager().setPrimaryClustersInitializedCb(
[this]() { onClusterManagerPrimaryInitializationComplete(); });
Expand Down Expand Up @@ -706,7 +710,7 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add

void InstanceImpl::onClusterManagerPrimaryInitializationComplete() {
// If RTDS was not configured the `onRuntimeReady` callback is immediately invoked.
Runtime::LoaderSingleton::get().startRtdsSubscriptions([this]() { onRuntimeReady(); });
runtime().startRtdsSubscriptions([this]() { onRuntimeReady(); });
}

void InstanceImpl::onRuntimeReady() {
Expand All @@ -730,8 +734,8 @@ void InstanceImpl::onRuntimeReady() {
Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, hds_config,
stats_store_, false)
->createUncachedRawAsyncClient(),
*dispatcher_, Runtime::LoaderSingleton::get(), stats_store_, *ssl_context_manager_,
info_factory_, access_log_manager_, *config_.clusterManager(), *local_info_, *admin_,
*dispatcher_, runtime(), stats_store_, *ssl_context_manager_, info_factory_,
access_log_manager_, *config_.clusterManager(), *local_info_, *admin_,
*singleton_manager_, thread_local_, messageValidationContext().dynamicValidationVisitor(),
*api_, options_);
}
Expand Down Expand Up @@ -931,7 +935,12 @@ void InstanceImpl::terminate() {
FatalErrorHandler::clearFatalActionsOnTerminate();
}

Runtime::Loader& InstanceImpl::runtime() { return Runtime::LoaderSingleton::get(); }
Runtime::Loader& InstanceImpl::runtime() {
if (runtime_singleton_.get()) {
return runtime_singleton_->instance();
}
return *runtime_;
}

void InstanceImpl::shutdown() {
ENVOY_LOG(info, "shutting down server instance");
Expand Down
1 change: 1 addition & 0 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
Singleton::ManagerPtr singleton_manager_;
Network::ConnectionHandlerPtr handler_;
std::unique_ptr<Runtime::ScopedLoaderSingleton> runtime_singleton_;
std::unique_ptr<Runtime::Loader> runtime_;
std::unique_ptr<Ssl::ContextManager> ssl_context_manager_;
ProdListenerComponentFactory listener_component_factory_;
ProdWorkerFactory worker_factory_;
Expand Down
10 changes: 10 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,16 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "multi_envoy_test",
srcs = [
"multi_envoy_test.cc",
],
deps = [
":http_protocol_integration_lib",
],
)

envoy_cc_test(
name = "multiplexed_upstream_integration_test",
srcs = [
Expand Down
67 changes: 41 additions & 26 deletions test/integration/base_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,14 @@ void BaseIntegrationTest::createUpstreams() {
}
}

void BaseIntegrationTest::createEnvoy() {
std::vector<uint32_t> ports;
for (auto& upstream : fake_upstreams_) {
if (upstream->localAddress()->ip()) {
ports.push_back(upstream->localAddress()->ip()->port());
}
}

if (use_lds_) {
std::string BaseIntegrationTest::finalizeConfigWithPorts(ConfigHelper& config_helper,
std::vector<uint32_t>& ports,
bool use_lds) {
if (use_lds) {
ENVOY_LOG_MISC(debug, "Setting up file-based LDS");
// Before finalization, set up a real lds path, replacing the default /dev/null
std::string lds_path = TestEnvironment::temporaryPath(TestUtility::uniqueFilename());
config_helper_.addConfigModifier(
config_helper.addConfigModifier(
[lds_path](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
bootstrap.mutable_dynamic_resources()->mutable_lds_config()->set_resource_api_version(
envoy::config::core::v3::V3);
Expand All @@ -180,15 +175,15 @@ void BaseIntegrationTest::createEnvoy() {
// Note that finalize assumes that every fake_upstream_ must correspond to a bootstrap config
// static entry. So, if you want to manually create a fake upstream without specifying it in the
// config, you will need to do so *after* initialize() (which calls this function) is done.
config_helper_.finalize(ports);
config_helper.finalize(ports);

envoy::config::bootstrap::v3::Bootstrap bootstrap = config_helper_.bootstrap();
if (use_lds_) {
envoy::config::bootstrap::v3::Bootstrap bootstrap = config_helper.bootstrap();
if (use_lds) {
// After the config has been finalized, write the final listener config to the lds file.
const std::string lds_path = config_helper_.bootstrap().dynamic_resources().lds_config().path();
const std::string lds_path = config_helper.bootstrap().dynamic_resources().lds_config().path();
envoy::service::discovery::v3::DiscoveryResponse lds;
lds.set_version_info("0");
for (auto& listener : config_helper_.bootstrap().static_resources().listeners()) {
for (auto& listener : config_helper.bootstrap().static_resources().listeners()) {
ProtobufWkt::Any* resource = lds.add_resources();
resource->PackFrom(listener);
}
Expand All @@ -204,6 +199,18 @@ void BaseIntegrationTest::createEnvoy() {

const std::string bootstrap_path = TestEnvironment::writeStringToFileForTest(
"bootstrap.pb", TestUtility::getProtobufBinaryStringFromMessage(bootstrap));
return bootstrap_path;
}

void BaseIntegrationTest::createEnvoy() {
std::vector<uint32_t> ports;
for (auto& upstream : fake_upstreams_) {
if (upstream->localAddress()->ip()) {
ports.push_back(upstream->localAddress()->ip()->port());
}
}

const std::string bootstrap_path = finalizeConfigWithPorts(config_helper_, ports, use_lds_);

std::vector<std::string> named_ports;
const auto& static_resources = config_helper_.bootstrap().static_resources();
Expand Down Expand Up @@ -291,12 +298,13 @@ void BaseIntegrationTest::setUpstreamAddress(
socket_address->set_port_value(fake_upstreams_[upstream_index]->localAddress()->ip()->port());
}

void BaseIntegrationTest::registerTestServerPorts(const std::vector<std::string>& port_names) {
void BaseIntegrationTest::registerTestServerPorts(const std::vector<std::string>& port_names,
IntegrationTestServerPtr& test_server) {
bool listeners_ready = false;
absl::Mutex l;
std::vector<std::reference_wrapper<Network::ListenerConfig>> listeners;
test_server_->server().dispatcher().post([this, &listeners, &listeners_ready, &l]() {
listeners = test_server_->server().listenerManager().listeners();
test_server->server().dispatcher().post([&listeners, &listeners_ready, &l, &test_server]() {
listeners = test_server->server().listenerManager().listeners();
l.Lock();
listeners_ready = true;
l.Unlock();
Expand All @@ -314,7 +322,7 @@ void BaseIntegrationTest::registerTestServerPorts(const std::vector<std::string>
}
}
const auto admin_addr =
test_server_->server().admin().socket().connectionInfoProvider().localAddress();
test_server->server().admin().socket().connectionInfoProvider().localAddress();
if (admin_addr->type() == Network::Address::Type::Ip) {
registerPort("admin", admin_addr->ip()->port());
}
Expand All @@ -330,8 +338,15 @@ std::string getListenerDetails(Envoy::Server::Instance& server) {
void BaseIntegrationTest::createGeneratedApiTestServer(
const std::string& bootstrap_path, const std::vector<std::string>& port_names,
Server::FieldValidationConfig validator_config, bool allow_lds_rejection) {
createGeneratedApiTestServer(bootstrap_path, port_names, validator_config, allow_lds_rejection,
test_server_);
}

test_server_ = IntegrationTestServer::create(
void BaseIntegrationTest::createGeneratedApiTestServer(
const std::string& bootstrap_path, const std::vector<std::string>& port_names,
Server::FieldValidationConfig validator_config, bool allow_lds_rejection,
IntegrationTestServerPtr& test_server) {
test_server = IntegrationTestServer::create(
bootstrap_path, version_, on_server_ready_function_, on_server_init_function_,
deterministic_value_, timeSystem(), *api_, defer_listener_finalization_, process_object_,
validator_config, concurrency_, drain_time_, drain_strategy_, proxy_buffer_factory_,
Expand All @@ -346,27 +361,27 @@ void BaseIntegrationTest::createGeneratedApiTestServer(
Event::TestTimeSystem::RealTimeBound bound(2 * TestUtility::DefaultTimeout);
const char* success = "listener_manager.listener_create_success";
const char* rejected = "listener_manager.lds.update_rejected";
for (Stats::CounterSharedPtr success_counter = test_server_->counter(success),
rejected_counter = test_server_->counter(rejected);
for (Stats::CounterSharedPtr success_counter = test_server->counter(success),
rejected_counter = test_server->counter(rejected);
(success_counter == nullptr ||
success_counter->value() <
concurrency_ * config_helper_.bootstrap().static_resources().listeners_size()) &&
(!allow_lds_rejection || rejected_counter == nullptr || rejected_counter->value() == 0);
success_counter = test_server_->counter(success),
rejected_counter = test_server_->counter(rejected)) {
success_counter = test_server->counter(success),
rejected_counter = test_server->counter(rejected)) {
if (!bound.withinBound()) {
RELEASE_ASSERT(0, "Timed out waiting for listeners.");
}
if (!allow_lds_rejection) {
RELEASE_ASSERT(rejected_counter == nullptr || rejected_counter->value() == 0,
absl::StrCat("Lds update failed. Details\n",
getListenerDetails(test_server_->server())));
getListenerDetails(test_server->server())));
}
// TODO(mattklein123): Switch to events and waitFor().
time_system_.realSleepDoNotUseWithoutScrutiny(std::chrono::milliseconds(10));
}

registerTestServerPorts(port_names);
registerTestServerPorts(port_names, test_server);
}
}

Expand Down
12 changes: 11 additions & 1 deletion test/integration/base_integration_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class BaseIntegrationTest : protected Logger::Loggable<Logger::Id::testing> {
makeClientConnectionWithOptions(uint32_t port,
const Network::ConnectionSocket::OptionsSharedPtr& options);

void registerTestServerPorts(const std::vector<std::string>& port_names);
void registerTestServerPorts(const std::vector<std::string>& port_names,
IntegrationTestServerPtr& test_server);
void createGeneratedApiTestServer(const std::string& bootstrap_path,
const std::vector<std::string>& port_names,
Server::FieldValidationConfig validator_config,
Expand All @@ -104,6 +105,12 @@ class BaseIntegrationTest : protected Logger::Loggable<Logger::Id::testing> {
Server::FieldValidationConfig validator_config,
bool allow_lds_rejection);

void createGeneratedApiTestServer(const std::string& bootstrap_path,
const std::vector<std::string>& port_names,
Server::FieldValidationConfig validator_config,
bool allow_lds_rejection,
IntegrationTestServerPtr& test_server);

Event::TestTimeSystem& timeSystem() { return time_system_; }

Stats::IsolatedStoreImpl stats_store_;
Expand Down Expand Up @@ -333,6 +340,9 @@ class BaseIntegrationTest : protected Logger::Loggable<Logger::Id::testing> {
}

protected:
static std::string finalizeConfigWithPorts(ConfigHelper& helper, std::vector<uint32_t>& ports,
bool use_lds);

void setUdpFakeUpstream(absl::optional<FakeUpstreamConfig::UdpConfig> config) {
upstream_config_.udp_fake_upstream_ = config;
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ void HttpIntegrationTest::initialize() {
config_helper_.addQuicDownstreamTransportSocketConfig();

BaseIntegrationTest::initialize();
registerTestServerPorts({"http"});
registerTestServerPorts({"http"}, test_server_);

Network::Address::InstanceConstSharedPtr server_addr = Network::Utility::resolveUrl(fmt::format(
"udp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), lookupPort("http")));
Expand Down
64 changes: 64 additions & 0 deletions test/integration/multi_envoy_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include "test/integration/http_protocol_integration.h"

namespace Envoy {
namespace {

class MultiEnvoyTest : public HttpProtocolIntegrationTest {
public:
// Create an envoy in front of the original Envoy.
void createL1Envoy();
~MultiEnvoyTest() { test_server_.reset(); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: destructors usually come before other methods. I'm mildly surprised this doesn't need to be marked override, but C++ is weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it probably does and clang tidy will catch me :-P Will fix.


IntegrationTestServerPtr l1_server_;
Thread::SkipAsserts skip_;
};

void MultiEnvoyTest::createL1Envoy() {
std::vector<uint32_t> ports;
std::vector<uint32_t> zero;
int l2_port = lookupPort("http");
for (auto& upstream : fake_upstreams_) {
if (upstream->localAddress()->ip()) {
ports.push_back(l2_port);
zero.push_back(0);
}
}
ConfigHelper l1_helper(version_, *api_,
MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap()));
l1_helper.setPorts(zero, true); // Zero out ports set by config_helper_'s finalize();
const std::string bootstrap_path = finalizeConfigWithPorts(l1_helper, ports, use_lds_);

std::vector<std::string> named_ports;
const auto& static_resources = config_helper_.bootstrap().static_resources();
named_ports.reserve(static_resources.listeners_size());
for (int i = 0; i < static_resources.listeners_size(); ++i) {
named_ports.push_back(static_resources.listeners(i).name() + "_l1");
}
createGeneratedApiTestServer(bootstrap_path, named_ports, {false, true, false}, false,
l1_server_);
}

INSTANTIATE_TEST_SUITE_P(IpVersions, MultiEnvoyTest,
testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams(
{Http::CodecType::HTTP1}, {Http::CodecType::HTTP1})),
HttpProtocolIntegrationTest::protocolTestParamsToString);

// A simple test to make sure that traffic can flow between client - L1 - L2 - upstream.
// This test does not currently support mixed protocol hops, or much of the other envoy test
// framework knobs.
TEST_P(MultiEnvoyTest, SimpleRequestAndResponse) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Would it also make sense (perhaps as a follow up?) to have a test in which we create 2 distinct Envoy instances, both in front of the same (or different, I suppose) upstreams. Then we could verify that requests flow through both, close one, and verify that the other still works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I also plan on having tests where I tear down the L2 and verify the L1 500s correctly, or tear down L1 and make sure I can talk directly to L2 -> client, to ensure lifetime issues are really sorted and regression tested against.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having some tests that verify different shutdown ordering would be useful I think, just to make sure that they can exist independently of each other

config_helper_.addRuntimeOverride("envoy.restart_features.no_runtime_singleton", "true");
initialize();
createL1Envoy();

codec_client_ = makeHttpConnection(lookupPort("http_l1"));
auto response =
sendRequestAndWaitForResponse(default_request_headers_, 0, default_response_headers_, 0);
EXPECT_EQ(1, test_server_->counter("http.config_test.rq_total"));
EXPECT_EQ(1, l1_server_->counter("http.config_test.rq_total"));

l1_server_.reset();
}

} // namespace
} // namespace Envoy