diff --git a/envoy/server/admin.h b/envoy/server/admin.h index 251b0cf47b44..48c1629fef44 100644 --- a/envoy/server/admin.h +++ b/envoy/server/admin.h @@ -281,6 +281,11 @@ class Admin { * Closes the listening socket for the admin. */ virtual void closeSocket() PURE; + + /** + * Creates a streaming request context from the url path in the admin stream. + */ + virtual RequestPtr makeRequest(AdminStream& admin_stream) const PURE; }; } // namespace Server diff --git a/source/exe/BUILD b/source/exe/BUILD index a4651aae9b4f..0f7947c1d07a 100644 --- a/source/exe/BUILD +++ b/source/exe/BUILD @@ -7,6 +7,7 @@ load( "envoy_cc_posix_without_linux_library", "envoy_cc_win32_library", "envoy_package", + "envoy_select_admin_functionality", "envoy_select_enable_http3", "envoy_select_signal_trace", ) @@ -102,7 +103,7 @@ envoy_cc_library( hdrs = [ "main_common.h", ], - deps = [ + deps = envoy_select_admin_functionality([":admin_response_lib"]) + [ ":platform_impl_lib", ":process_wide_lib", ":stripped_main_base_lib", @@ -118,6 +119,19 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "admin_response_lib", + srcs = ["admin_response.cc"], + hdrs = ["admin_response.h"], + deps = [ + "//source/common/buffer:buffer_lib", + "//source/common/http:header_map_lib", + "//source/server:server_lib", + "//source/server/admin:admin_lib", + "//source/server/admin:utils_lib", + ], +) + envoy_cc_library( name = "main_common_with_all_extensions_lib", deps = [ diff --git a/source/exe/admin_response.cc b/source/exe/admin_response.cc new file mode 100644 index 000000000000..0c1ab0958bec --- /dev/null +++ b/source/exe/admin_response.cc @@ -0,0 +1,191 @@ +#include "source/exe/admin_response.h" + +#include "envoy/server/admin.h" + +#include "source/server/admin/admin_filter.h" +#include "source/server/admin/utils.h" + +namespace Envoy { + +AdminResponse::AdminResponse(Server::Instance& server, absl::string_view path, + absl::string_view method, SharedPtrSet response_set) + : server_(server), opt_admin_(server.admin()), shared_response_set_(response_set) { + request_headers_->setMethod(method); + request_headers_->setPath(path); +} + +AdminResponse::~AdminResponse() { + cancel(); + shared_response_set_->detachResponse(this); +} + +void AdminResponse::getHeaders(HeadersFn fn) { + auto request_headers = [response = shared_from_this()]() { response->requestHeaders(); }; + + // First check for cancelling or termination. + { + absl::MutexLock lock(&mutex_); + ASSERT(headers_fn_ == nullptr); + if (cancelled_) { + return; + } + headers_fn_ = fn; + if (terminated_ || !opt_admin_) { + sendErrorLockHeld(); + return; + } + } + server_.dispatcher().post(request_headers); +} + +void AdminResponse::nextChunk(BodyFn fn) { + auto request_next_chunk = [response = shared_from_this()]() { response->requestNextChunk(); }; + + // Note the caller may race a call to nextChunk with the server being + // terminated. + { + absl::MutexLock lock(&mutex_); + ASSERT(body_fn_ == nullptr); + if (cancelled_) { + return; + } + body_fn_ = fn; + if (terminated_ || !opt_admin_) { + sendAbortChunkLockHeld(); + return; + } + } + + // Note that nextChunk may be called from any thread -- it's the callers choice, + // including the Envoy main thread, which would occur if the caller initiates + // the request of a chunk upon receipt of the previous chunk. + // + // In that case it may race against the AdminResponse object being deleted, + // in which case the callbacks, held in a shared_ptr, will be cancelled + // from the destructor. If that happens *before* we post to the main thread, + // we will just skip and never call fn. + server_.dispatcher().post(request_next_chunk); +} + +// Called by the user if it is not longer interested in the result of the +// admin request. After calling cancel() the caller must not call nextChunk or +// getHeaders. +void AdminResponse::cancel() { + absl::MutexLock lock(&mutex_); + cancelled_ = true; + headers_fn_ = nullptr; + body_fn_ = nullptr; +} + +bool AdminResponse::cancelled() const { + absl::MutexLock lock(&mutex_); + return cancelled_; +} + +// Called from terminateAdminRequests when the Envoy server +// terminates. After this is called, the caller may need to complete the +// admin response, and so calls to getHeader and nextChunk remain valid, +// resulting in 503 and an empty body. +void AdminResponse::terminate() { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + absl::MutexLock lock(&mutex_); + if (!terminated_) { + terminated_ = true; + sendErrorLockHeld(); + sendAbortChunkLockHeld(); + } +} + +void AdminResponse::requestHeaders() { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + { + absl::MutexLock lock(&mutex_); + if (cancelled_ || terminated_) { + return; + } + } + Server::AdminFilter filter(*opt_admin_); + filter.decodeHeaders(*request_headers_, false); + request_ = opt_admin_->makeRequest(filter); + code_ = request_->start(*response_headers_); + { + absl::MutexLock lock(&mutex_); + if (headers_fn_ == nullptr || cancelled_) { + return; + } + Server::Utility::populateFallbackResponseHeaders(code_, *response_headers_); + headers_fn_(code_, *response_headers_); + headers_fn_ = nullptr; + } +} + +void AdminResponse::requestNextChunk() { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + { + absl::MutexLock lock(&mutex_); + if (cancelled_ || terminated_ || !more_data_) { + return; + } + } + ASSERT(response_.length() == 0); + more_data_ = request_->nextChunk(response_); + { + absl::MutexLock lock(&mutex_); + if (sent_end_stream_ || cancelled_) { + return; + } + sent_end_stream_ = !more_data_; + body_fn_(response_, more_data_); + ASSERT(response_.length() == 0); + body_fn_ = nullptr; + } +} + +void AdminResponse::sendAbortChunkLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + if (!sent_end_stream_ && body_fn_ != nullptr) { + response_.drain(response_.length()); + body_fn_(response_, false); + sent_end_stream_ = true; + } + body_fn_ = nullptr; +} + +void AdminResponse::sendErrorLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + if (headers_fn_ != nullptr) { + code_ = Http::Code::InternalServerError; + Server::Utility::populateFallbackResponseHeaders(code_, *response_headers_); + headers_fn_(code_, *response_headers_); + headers_fn_ = nullptr; + } +} + +void AdminResponse::PtrSet::terminateAdminRequests() { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + + absl::MutexLock lock(&mutex_); + accepting_admin_requests_ = false; + for (AdminResponse* response : response_set_) { + // Consider the possibility of response being deleted due to its creator + // dropping its last reference right here. From its destructor it will call + // detachResponse(), which is mutex-ed against this loop, so before the + // memory becomes invalid, the call to terminate will complete. + response->terminate(); + } + response_set_.clear(); +} + +void AdminResponse::PtrSet::attachResponse(AdminResponse* response) { + absl::MutexLock lock(&mutex_); + if (accepting_admin_requests_) { + response_set_.insert(response); + } else { + response->terminate(); + } +} + +void AdminResponse::PtrSet::detachResponse(AdminResponse* response) { + absl::MutexLock lock(&mutex_); + response_set_.erase(response); +} + +} // namespace Envoy diff --git a/source/exe/admin_response.h b/source/exe/admin_response.h new file mode 100644 index 000000000000..b3683b5707c7 --- /dev/null +++ b/source/exe/admin_response.h @@ -0,0 +1,188 @@ +#pragma once + +#include + +#include "envoy/server/instance.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/common/http/header_map_impl.h" + +#include "absl/container/flat_hash_set.h" +#include "absl/synchronization/mutex.h" + +namespace Envoy { + +class AdminResponse; + +// Holds context for a streaming response from the admin system, enabling +// flow-control into another system. This is particularly important when the +// generated response is very large, such that holding it in memory may cause +// fragmentation or out-of-memory failures. It is possible to interleave xDS +// response handling, overload management, and other admin requests during the +// streaming of a long admin response. +// +// There can be be multiple AdminResponses at a time; each are separately +// managed. However they will obtain their data from Envoy functions that +// run on the main thread. +// +// Responses may still be active after the server has shut down, and is no +// longer running its main thread dispatcher. In this state, the callbacks +// will be called with appropriate error codes. +// +// Requests can also be cancelled explicitly by calling cancel(). After +// cancel() is called, no further callbacks will be called by the response. +// +// The lifecycle of an AdminResponse is rendered as a finite state machine +// bubble diagram: +// https://docs.google.com/drawings/d/1njUl1twApEMoxmjaG4b7optTh5fcb_YNcfSnkHbdfq0/view +class AdminResponse : public std::enable_shared_from_this { +public: + // AdminResponse can outlive MainCommonBase. But AdminResponse needs a + // reliable way of knowing whether MainCommonBase is alive, so we do this with + // PtrSet, which is held by MainCommonBase and all the active AdminResponses. + // via shared_ptr. This gives MainCommonBase a reliable way of notifying all + // active responses that it is being shut down, and thus all responses need to + // be terminated. And it gives a reliable way for AdminResponse to detach + // itself, whether or not MainCommonBase is already deleted. + // + // In summary: + // * MainCommonBase can outlive AdminResponse so we need detachResponse. + // * AdminResponse can outlive MainCommonBase, so we need shared_ptr. + class PtrSet { + public: + /** + * Called when an AdminResponse is created. When terminateAdminRequests is + * called, all outstanding response objects have their terminate() methods + * called. + * + * @param response the response pointer to be added to the set. + */ + void attachResponse(AdminResponse* response); + + /** + * Called when an AdminResponse is terminated, either by completing normally + * or having the caller call cancel on it. Either way it needs to be removed + * from the set that will be used by terminateAdminRequests below. + * + * @param response the response pointer to be removed from the set. + */ + void detachResponse(AdminResponse* response); + + /** + * Called after the server run-loop finishes; any outstanding streaming + * admin requests will otherwise hang as the main-thread dispatcher loop + * will no longer run. + */ + void terminateAdminRequests(); + + mutable absl::Mutex mutex_; + absl::flat_hash_set response_set_ ABSL_GUARDED_BY(mutex_); + bool accepting_admin_requests_ ABSL_GUARDED_BY(mutex_) = true; + }; + using SharedPtrSet = std::shared_ptr; + + AdminResponse(Server::Instance& server, absl::string_view path, absl::string_view method, + SharedPtrSet response_set); + ~AdminResponse(); + + /** + * Requests the headers for the response. This can be called from any + * thread, and HeaderFn may also be called from any thread. + * + * HeadersFn will not be called after cancel(). It is invalid to + * to call nextChunk from within HeadersFn -- the caller must trigger + * such a call on another thread, after HeadersFn returns. Calling + * nextChunk from HeadersFn may deadlock. + * + * If the server is shut down during the operation, headersFn may + * be called with a 503, if it has not already been called. + * + * @param fn The function to be called with the headers and status code. + */ + using HeadersFn = std::function; + void getHeaders(HeadersFn fn); + + /** + * Requests a new chunk. This can be called from any thread, and the BodyFn + * callback may also be called from any thread. BodyFn will be called in a + * loop until the Buffer passed to it is fully drained. When 'false' is + * passed as the second arg to BodyFn, that signifies the end of the + * response, and nextChunk must not be called again. + * + * BodyFn will not be called after cancel(). It is invalid to + * to call nextChunk from within BodyFn -- the caller must trigger + * such a call on another thread, after BodyFn returns. Calling + * nextChunk from BodyFn may deadlock. + * + * If the server is shut down during the operation, bodyFn will + * be called with an empty body and 'false' for more_data, if + * this has not already occurred. + * + * @param fn A function to be called on each chunk. + */ + using BodyFn = std::function; + void nextChunk(BodyFn fn); + + /** + * Requests that any outstanding callbacks be dropped. This can be called + * when the context in which the request is made is destroyed. This enables + * an application to implement a. The Response itself is held as a + * shared_ptr as that makes it much easier to manage cancellation across + * multiple threads. + */ + void cancel(); + + /** + * @return whether the request was cancelled. + */ + bool cancelled() const; + +private: + /** + * Called when the server is terminated. This calls any outstanding + * callbacks to be called. If nextChunk is called after termination, + * its callback is called false for the second arg, indicating + * end of stream. + */ + void terminate(); + + void requestHeaders(); + void requestNextChunk(); + void sendAbortChunkLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void sendErrorLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + Server::Instance& server_; + OptRef opt_admin_; + Buffer::OwnedImpl response_; + Http::Code code_; + Server::Admin::RequestPtr request_; + Http::RequestHeaderMapPtr request_headers_{Http::RequestHeaderMapImpl::create()}; + Http::ResponseHeaderMapPtr response_headers_{Http::ResponseHeaderMapImpl::create()}; + bool more_data_ = true; + + // True if cancel() was explicitly called by the user; headers and body + // callbacks are never called after cancel(). + bool cancelled_ ABSL_GUARDED_BY(mutex_) = false; + + // True if the Envoy server has stopped running its main loop. Headers and + // body requests can be initiated and called back are called after terminate, + // so callers do not have to special case this -- the request will simply fail + // with an empty response. + bool terminated_ ABSL_GUARDED_BY(mutex_) = false; + + // Used to indicate whether the body function has been called with false + // as its second argument. That must always happen at most once, even + // if terminate races with the normal end-of-stream marker. more=false + // may never be sent if the request is cancelled, nor deleted prior to + // it being requested. + bool sent_end_stream_ ABSL_GUARDED_BY(mutex_) = false; + + HeadersFn headers_fn_ ABSL_GUARDED_BY(mutex_); + BodyFn body_fn_ ABSL_GUARDED_BY(mutex_); + mutable absl::Mutex mutex_; + + SharedPtrSet shared_response_set_; +}; +using AdminResponseSharedPtr = std::shared_ptr; + +} // namespace Envoy diff --git a/source/exe/main_common.cc b/source/exe/main_common.cc index d18a575f2dd0..87b3342f3335 100644 --- a/source/exe/main_common.cc +++ b/source/exe/main_common.cc @@ -10,6 +10,7 @@ #include "source/common/common/compiler_requirements.h" #include "source/common/common/logger.h" #include "source/common/common/perf_annotation.h" +#include "source/common/common/thread.h" #include "source/common/network/utility.h" #include "source/common/stats/thread_local_store.h" #include "source/exe/platform_impl.h" @@ -56,26 +57,46 @@ MainCommonBase::MainCommonBase(const Server::Options& options, Event::TimeSystem std::unique_ptr process_context) : StrippedMainBase(options, time_system, listener_hooks, component_factory, std::move(platform_impl), std::move(random_generator), - std::move(process_context), createFunction()) {} + std::move(process_context), createFunction()) +#ifdef ENVOY_ADMIN_FUNCTIONALITY + , + shared_response_set_(std::make_shared()) +#endif +{ +} bool MainCommonBase::run() { + // Avoid returning from inside switch cases to minimize uncovered lines + // while avoiding gcc warnings by hitting the final return. + bool ret = false; + switch (options_.mode()) { case Server::Mode::Serve: runServer(); - return true; +#ifdef ENVOY_ADMIN_FUNCTIONALITY + shared_response_set_->terminateAdminRequests(); +#endif + ret = true; + break; case Server::Mode::Validate: - return Server::validateConfig( + ret = Server::validateConfig( options_, Network::Utility::getLocalAddress(options_.localAddressIpVersion()), component_factory_, platform_impl_->threadFactory(), platform_impl_->fileSystem(), process_context_ ? ProcessContextOptRef(std::ref(*process_context_)) : absl::nullopt); + break; case Server::Mode::InitOnly: PERF_DUMP(); - return true; + ret = true; + break; } - return false; // for gcc. + return ret; } #ifdef ENVOY_ADMIN_FUNCTIONALITY + +// This request variant buffers the entire response in one string. New uses +// should opt for the streaming version below, where an AdminResponse object +// is created and used to stream data with flow-control. void MainCommonBase::adminRequest(absl::string_view path_and_query, absl::string_view method, const AdminRequestFn& handler) { std::string path_and_query_buf = std::string(path_and_query); @@ -89,6 +110,14 @@ void MainCommonBase::adminRequest(absl::string_view path_and_query, absl::string handler(*response_headers, body); }); } + +AdminResponseSharedPtr MainCommonBase::adminRequest(absl::string_view path_and_query, + absl::string_view method) { + auto response = + std::make_shared(*server(), path_and_query, method, shared_response_set_); + shared_response_set_->attachResponse(response.get()); + return response; +} #endif MainCommon::MainCommon(const std::vector& args) diff --git a/source/exe/main_common.h b/source/exe/main_common.h index 349decdec0cc..500f293ce7f1 100644 --- a/source/exe/main_common.h +++ b/source/exe/main_common.h @@ -10,6 +10,10 @@ #include "source/common/stats/symbol_table.h" #include "source/common/stats/thread_local_store.h" #include "source/common/thread_local/thread_local_impl.h" + +#ifdef ENVOY_ADMIN_FUNCTIONALITY +#include "source/exe/admin_response.h" +#endif #include "source/exe/process_wide.h" #include "source/exe/stripped_main_base.h" #include "source/server/listener_hooks.h" @@ -37,20 +41,45 @@ class MainCommonBase : public StrippedMainBase { using AdminRequestFn = std::function; - // Makes an admin-console request by path, calling handler() when complete. - // The caller can initiate this from any thread, but it posts the request - // onto the main thread, so the handler is called asynchronously. - // - // This is designed to be called from downstream consoles, so they can access - // the admin console information stream without opening up a network port. - // - // This should only be called while run() is active; ensuring this is the - // responsibility of the caller. - // - // TODO(jmarantz): consider std::future for encapsulating this delayed request - // semantics, rather than a handler callback. + /** + * Makes an admin-console request by path, calling handler() when complete. + * The caller can initiate this from any thread, but it posts the request + * onto the main thread, so the handler is called asynchronously. + * + * This is designed to be called from downstream consoles, so they can access + * the admin console information stream without opening up a network port. + * + * This should only be called while run() is active; ensuring this is the + * responsibility of the caller. + * + * TODO(jmarantz): consider std::future for encapsulating this delayed request + * semantics, rather than a handler callback. + * + * Consider using the 2-arg version of adminRequest, below, which enables + * streaming of large responses one chunk at a time, without holding + * potentially huge response text in memory. + * + * @param path_and_query the URL to send to admin, including any query params. + * @param method the HTTP method: "GET" or "POST" + * @param handler an async callback that will be sent the serialized headers + * and response. + */ void adminRequest(absl::string_view path_and_query, absl::string_view method, const AdminRequestFn& handler); + + /** + * Initiates a streaming response to an admin request. The caller interacts + * with the returned AdminResponse object, and can thus control the pace of + * handling chunks of response text. + * + * @param path_and_query the URL to send to admin, including any query params. + * @param method the HTTP method: "GET" or "POST" + * @return AdminResponseSharedPtr the response object + */ + AdminResponseSharedPtr adminRequest(absl::string_view path_and_query, absl::string_view method); + +private: + AdminResponse::SharedPtrSet shared_response_set_; #endif }; @@ -82,6 +111,9 @@ class MainCommon { const MainCommonBase::AdminRequestFn& handler) { base_.adminRequest(path_and_query, method, handler); } + AdminResponseSharedPtr adminRequest(absl::string_view path_and_query, absl::string_view method) { + return base_.adminRequest(path_and_query, method); + } #endif static std::string hotRestartVersion(bool hot_restart_enabled); diff --git a/source/server/admin/BUILD b/source/server/admin/BUILD index 3913191068d4..1c453e712a8e 100644 --- a/source/server/admin/BUILD +++ b/source/server/admin/BUILD @@ -388,6 +388,7 @@ envoy_cc_library( name = "utils_lib", srcs = ["utils.cc"], hdrs = ["utils.h"], + visibility = ["//visibility:public"], deps = [ "//envoy/init:manager_interface", "//source/common/common:enum_to_int", diff --git a/source/server/admin/admin.cc b/source/server/admin/admin.cc index 07f3b271381a..0b21be7fef7f 100644 --- a/source/server/admin/admin.cc +++ b/source/server/admin/admin.cc @@ -293,7 +293,7 @@ bool AdminImpl::createNetworkFilterChain(Network::Connection& connection, bool AdminImpl::createFilterChain(Http::FilterChainManager& manager, bool, const Http::FilterChainOptions&) const { Http::FilterFactoryCb factory = [this](Http::FilterChainFactoryCallbacks& callbacks) { - callbacks.addStreamFilter(std::make_shared(createRequestFunction())); + callbacks.addStreamFilter(std::make_shared(*this)); }; manager.applyFilterFactoryCb({}, factory); return true; @@ -494,7 +494,7 @@ bool AdminImpl::removeHandler(const std::string& prefix) { Http::Code AdminImpl::request(absl::string_view path_and_query, absl::string_view method, Http::ResponseHeaderMap& response_headers, std::string& body) { - AdminFilter filter(createRequestFunction()); + AdminFilter filter(*this); auto request_headers = Http::RequestHeaderMapImpl::create(); request_headers->setMethod(method); diff --git a/source/server/admin/admin.h b/source/server/admin/admin.h index 86c6ab7f57d3..ef0cde79927b 100644 --- a/source/server/admin/admin.h +++ b/source/server/admin/admin.h @@ -216,9 +216,6 @@ class AdminImpl : public Admin, void closeSocket() override; void addListenerToHandler(Network::ConnectionHandler* handler) override; - GenRequestFn createRequestFunction() const { - return [this](AdminStream& admin_stream) -> RequestPtr { return makeRequest(admin_stream); }; - } uint64_t maxRequestsPerConnection() const override { return 0; } const HttpConnectionManagerProto::ProxyStatusConfig* proxyStatusConfig() const override { return proxy_status_config_.get(); @@ -246,10 +243,7 @@ class AdminImpl : public Admin, ::Envoy::Http::HeaderValidatorStats& getHeaderValidatorStats(Http::Protocol protocol); #endif - /** - * Creates a Request from the request in the admin stream. - */ - RequestPtr makeRequest(AdminStream& admin_stream) const; + RequestPtr makeRequest(AdminStream& admin_stream) const override; /** * Creates a UrlHandler structure from a non-chunked callback. diff --git a/source/server/admin/admin_filter.cc b/source/server/admin/admin_filter.cc index 7a11c251dc88..d9dee2576612 100644 --- a/source/server/admin/admin_filter.cc +++ b/source/server/admin/admin_filter.cc @@ -5,8 +5,7 @@ namespace Envoy { namespace Server { -AdminFilter::AdminFilter(Admin::GenRequestFn admin_handler_fn) - : admin_handler_fn_(admin_handler_fn) {} +AdminFilter::AdminFilter(const Admin& admin) : admin_(admin) {} Http::FilterHeadersStatus AdminFilter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) { @@ -87,12 +86,13 @@ void AdminFilter::onComplete() { auto header_map = Http::ResponseHeaderMapImpl::create(); RELEASE_ASSERT(request_headers_, ""); - Admin::RequestPtr handler = admin_handler_fn_(*this); + Admin::RequestPtr handler = admin_.makeRequest(*this); Http::Code code = handler->start(*header_map); Utility::populateFallbackResponseHeaders(code, *header_map); decoder_callbacks_->encodeHeaders(std::move(header_map), false, StreamInfo::ResponseCodeDetails::get().AdminFilterResponse); + // TODO(#31087): use high/lower watermarks to apply flow-control to the admin http port. bool more_data; do { Buffer::OwnedImpl response; diff --git a/source/server/admin/admin_filter.h b/source/server/admin/admin_filter.h index e163029b2283..2dbe6d01df94 100644 --- a/source/server/admin/admin_filter.h +++ b/source/server/admin/admin_filter.h @@ -28,7 +28,13 @@ class AdminFilter : public Http::PassThroughFilter, absl::string_view path_and_query, Http::ResponseHeaderMap& response_headers, Buffer::OwnedImpl& response, AdminFilter& filter)>; - AdminFilter(Admin::GenRequestFn admin_handler_func); + /** + * Instantiates an AdminFilter. + * + * @param admin the admin context from which to create the filter. This is used + * to create a request object based on the path. + */ + AdminFilter(const Admin& admin); // Http::StreamFilterBase // Handlers relying on the reference should use addOnDestroyCallback() @@ -58,7 +64,7 @@ class AdminFilter : public Http::PassThroughFilter, * Called when an admin request has been completely received. */ void onComplete(); - Admin::GenRequestFn admin_handler_fn_; + const Admin& admin_; Http::RequestHeaderMap* request_headers_{}; std::list> on_destroy_callbacks_; bool end_stream_on_complete_ = true; diff --git a/source/server/config_validation/admin.h b/source/server/config_validation/admin.h index 173afb29d355..8e75607e3f8f 100644 --- a/source/server/config_validation/admin.h +++ b/source/server/config_validation/admin.h @@ -39,6 +39,7 @@ class ValidationAdmin : public Admin { void addListenerToHandler(Network::ConnectionHandler* handler) override; uint32_t concurrency() const override { return 1; } void closeSocket() override {} + RequestPtr makeRequest(AdminStream&) const override { return nullptr; } private: ConfigTrackerImpl config_tracker_; diff --git a/test/exe/BUILD b/test/exe/BUILD index 7be94770af65..065b80e97014 100644 --- a/test/exe/BUILD +++ b/test/exe/BUILD @@ -1,6 +1,7 @@ load( "//bazel:envoy_build_system.bzl", "envoy_cc_test", + "envoy_cc_test_library", "envoy_package", "envoy_select_admin_functionality", "envoy_sh_test", @@ -60,13 +61,50 @@ envoy_sh_test( ], ) +envoy_cc_test_library( + name = "main_common_test_base_lib", + srcs = ["main_common_test_base.cc"], + hdrs = ["main_common_test_base.h"], + data = [ + "//test/config/integration:google_com_proxy_port_0", + ], + deps = [ + "//source/common/api:api_lib", + "//source/common/stats:isolated_store_lib", + "//source/exe:envoy_main_common_with_core_extensions_lib", + "//source/exe:platform_impl_lib", + "//source/extensions/clusters/logical_dns:logical_dns_cluster_lib", + "//test/mocks/runtime:runtime_mocks", + "//test/test_common:contention_lib", + "//test/test_common:environment_lib", + "//test/test_common:thread_factory_for_test_lib", + ], +) + envoy_cc_test( name = "main_common_test", srcs = envoy_select_admin_functionality(["main_common_test.cc"]), data = [ "//test/config/integration:google_com_proxy_port_0", ], - deps = [ + deps = envoy_select_admin_functionality([":main_common_test_base_lib"]) + [ + "//source/common/api:api_lib", + "//source/exe:envoy_main_common_with_core_extensions_lib", + "//source/exe:platform_impl_lib", + "//source/extensions/clusters/logical_dns:logical_dns_cluster_lib", + "//test/mocks/runtime:runtime_mocks", + "//test/test_common:contention_lib", + "//test/test_common:environment_lib", + ], +) + +envoy_cc_test( + name = "admin_response_test", + srcs = envoy_select_admin_functionality(["admin_response_test.cc"]), + data = [ + "//test/config/integration:google_com_proxy_port_0", + ], + deps = envoy_select_admin_functionality([":main_common_test_base_lib"]) + [ "//source/common/api:api_lib", "//source/exe:envoy_main_common_with_core_extensions_lib", "//source/exe:platform_impl_lib", diff --git a/test/exe/admin_response_test.cc b/test/exe/admin_response_test.cc new file mode 100644 index 000000000000..33a7f10248e5 --- /dev/null +++ b/test/exe/admin_response_test.cc @@ -0,0 +1,351 @@ +#include "test/exe/main_common_test_base.h" + +#include "gtest/gtest.h" + +namespace Envoy { + +class AdminStreamingTest : public AdminRequestTestBase, public testing::Test { +protected: + static constexpr absl::string_view StreamingEndpoint = "/stream"; + + class StreamingAdminRequest : public Envoy::Server::Admin::Request { + public: + static constexpr uint64_t NumChunks = 10; + static constexpr uint64_t BytesPerChunk = 10000; + + StreamingAdminRequest(std::function& get_headers_hook, + std::function& next_chunk_hook) + : chunk_(BytesPerChunk, 'a'), get_headers_hook_(get_headers_hook), + next_chunk_hook_(next_chunk_hook) {} + Http::Code start(Http::ResponseHeaderMap&) override { + get_headers_hook_(); + return Http::Code::OK; + } + bool nextChunk(Buffer::Instance& response) override { + next_chunk_hook_(); + response.add(chunk_); + return --chunks_remaining_ > 0; + } + + private: + const std::string chunk_; + uint64_t chunks_remaining_{NumChunks}; + std::function& get_headers_hook_; + std::function& next_chunk_hook_; + }; + + AdminStreamingTest() : AdminRequestTestBase(Network::Address::IpVersion::v4) { + startEnvoy(); + started_.WaitForNotification(); + Server::Admin& admin = *main_common_->server()->admin(); + admin.addStreamingHandler( + std::string(StreamingEndpoint), "streaming api", + [this](Server::AdminStream&) -> Server::Admin::RequestPtr { + return std::make_unique(get_headers_hook_, next_chunk_hook_); + }, + true, false); + } + + struct ResponseData { + uint64_t num_chunks_{0}; + uint64_t num_bytes_{0}; + Http::Code code_; + std::string content_type_; + }; + + ResponseData runStreamingRequest(AdminResponseSharedPtr response, + std::function chunk_hook = nullptr) { + absl::Notification done; + std::vector out; + absl::Notification headers_notify; + ResponseData response_data; + response->getHeaders( + [&headers_notify, &response_data](Http::Code code, Http::ResponseHeaderMap& headers) { + response_data.code_ = code; + response_data.content_type_ = headers.getContentTypeValue(); + headers_notify.Notify(); + }); + headers_notify.WaitForNotification(); + bool cont = true; + while (cont && !response->cancelled()) { + absl::Notification chunk_notify; + response->nextChunk( + [&chunk_notify, &response_data, &cont](Buffer::Instance& chunk, bool more) { + cont = more; + response_data.num_bytes_ += chunk.length(); + chunk.drain(chunk.length()); + ++response_data.num_chunks_; + chunk_notify.Notify(); + }); + chunk_notify.WaitForNotification(); + if (chunk_hook != nullptr) { + chunk_hook(); + } + } + + return response_data; + } + + /** + * @return a streaming response to a GET of StreamingEndpoint. + */ + AdminResponseSharedPtr streamingResponse() { + return main_common_->adminRequest(StreamingEndpoint, "GET"); + } + + /** + * In order to trigger certain early-exit criteria in a test, we can exploit + * the fact that all the admin responses are delivered on the main thread. + * So we can pause those by blocking the main thread indefinitely. + * + * The provided lambda runs in the main thread, between two notifications + * controlled by this function. + * + * @param fn function to run in the main thread, before interlockMainThread returns. + */ + void interlockMainThread(std::function fn) { + main_common_->dispatcherForTest().post([this, fn] { + resume_.WaitForNotification(); + fn(); + pause_point_.Notify(); + }); + resume_.Notify(); + pause_point_.WaitForNotification(); + } + + /** + * Requests the headers and waits until the headers have been sent. + * + * @param response the response from which to get headers. + */ + void waitForHeaders(AdminResponseSharedPtr response) { + absl::Notification headers_notify; + response->getHeaders( + [&headers_notify](Http::Code, Http::ResponseHeaderMap&) { headers_notify.Notify(); }); + headers_notify.WaitForNotification(); + } + + /** + * Initiates a '/quitquitquit' call and requests the headers for that call, + * but does not wait for the call to complete. We avoid waiting in order to + * trigger a potential race to ensure that MainCommon handles it properly. + */ + void quitAndRequestHeaders() { + AdminResponseSharedPtr quit_response = main_common_->adminRequest("/quitquitquit", "POST"); + quit_response->getHeaders([](Http::Code, Http::ResponseHeaderMap&) {}); + } + + // This variable provides a hook to allow a test method to specify a hook to + // run when nextChunk() is called. This is currently used only for one test, + // CancelAfterAskingForChunk, that initiates a cancel() from within the chunk + // handler. + std::function get_headers_hook_ = []() {}; + std::function next_chunk_hook_ = []() {}; +}; + +TEST_F(AdminStreamingTest, RequestGetStatsAndQuit) { + AdminResponseSharedPtr response = streamingResponse(); + ResponseData response_data = runStreamingRequest(response); + EXPECT_EQ(StreamingAdminRequest::NumChunks, response_data.num_chunks_); + EXPECT_EQ(StreamingAdminRequest::NumChunks * StreamingAdminRequest::BytesPerChunk, + response_data.num_bytes_); + EXPECT_EQ(Http::Code::OK, response_data.code_); + EXPECT_EQ("text/plain; charset=UTF-8", response_data.content_type_); + EXPECT_TRUE(quitAndWait()); +} + +TEST_F(AdminStreamingTest, QuitDuringChunks) { + int quit_counter = 0; + static constexpr int chunks_to_send_before_quitting = 3; + AdminResponseSharedPtr response = streamingResponse(); + ResponseData response_data = runStreamingRequest(response, [&quit_counter, this]() { + if (++quit_counter == chunks_to_send_before_quitting) { + EXPECT_TRUE(quitAndWait()); + } + }); + EXPECT_EQ(4, response_data.num_chunks_); + EXPECT_EQ(chunks_to_send_before_quitting * StreamingAdminRequest::BytesPerChunk, + response_data.num_bytes_); + EXPECT_EQ(Http::Code::OK, response_data.code_); + EXPECT_EQ("text/plain; charset=UTF-8", response_data.content_type_); +} + +TEST_F(AdminStreamingTest, CancelDuringChunks) { + int quit_counter = 0; + static constexpr int chunks_to_send_before_quitting = 3; + AdminResponseSharedPtr response = streamingResponse(); + ResponseData response_data = runStreamingRequest(response, [response, &quit_counter]() { + if (++quit_counter == chunks_to_send_before_quitting) { + response->cancel(); + } + }); + EXPECT_EQ(3, response_data.num_chunks_); // no final call to the chunk handler after cancel. + EXPECT_EQ(chunks_to_send_before_quitting * StreamingAdminRequest::BytesPerChunk, + response_data.num_bytes_); + EXPECT_EQ(Http::Code::OK, response_data.code_); + EXPECT_EQ("text/plain; charset=UTF-8", response_data.content_type_); + EXPECT_TRUE(quitAndWait()); +} + +TEST_F(AdminStreamingTest, CancelBeforeAskingForHeader) { + AdminResponseSharedPtr response = streamingResponse(); + interlockMainThread([response]() { response->cancel(); }); + int header_calls = 0; + + // After 'cancel', the headers function will not be called. + response->getHeaders([&header_calls](Http::Code, Http::ResponseHeaderMap&) { ++header_calls; }); + EXPECT_TRUE(quitAndWait()); + EXPECT_EQ(0, header_calls); +} + +TEST_F(AdminStreamingTest, CancelAfterAskingForHeader1) { + int header_calls = 0; + AdminResponseSharedPtr response = streamingResponse(); + interlockMainThread([&header_calls, response]() { + response->getHeaders([&header_calls](Http::Code, Http::ResponseHeaderMap&) { ++header_calls; }); + response->cancel(); + }); + EXPECT_TRUE(quitAndWait()); + EXPECT_EQ(0, header_calls); +} + +TEST_F(AdminStreamingTest, CancelAfterAskingForHeader2) { + int header_calls = 0; + AdminResponseSharedPtr response = streamingResponse(); + get_headers_hook_ = [&response]() { response->cancel(); }; + response->getHeaders([&header_calls](Http::Code, Http::ResponseHeaderMap&) { ++header_calls; }); + EXPECT_TRUE(quitAndWait()); + EXPECT_EQ(0, header_calls); +} + +TEST_F(AdminStreamingTest, DeleteAfterAskingForHeader1) { + int header_calls = 0; + AdminResponseSharedPtr response = streamingResponse(); + interlockMainThread([&response, &header_calls]() { + response->getHeaders([&header_calls](Http::Code, Http::ResponseHeaderMap&) { ++header_calls; }); + response.reset(); + }); + EXPECT_TRUE(quitAndWait()); + EXPECT_EQ(1, header_calls); +} + +TEST_F(AdminStreamingTest, DeleteAfterAskingForHeader2) { + int header_calls = 0; + AdminResponseSharedPtr response = streamingResponse(); + get_headers_hook_ = [&response]() { response.reset(); }; + response->getHeaders([&header_calls](Http::Code, Http::ResponseHeaderMap&) { ++header_calls; }); + EXPECT_TRUE(quitAndWait()); + EXPECT_EQ(1, header_calls); +} + +TEST_F(AdminStreamingTest, CancelBeforeAskingForChunk1) { + AdminResponseSharedPtr response = streamingResponse(); + waitForHeaders(response); + response->cancel(); + int chunk_calls = 0; + response->nextChunk([&chunk_calls](Buffer::Instance&, bool) { ++chunk_calls; }); + EXPECT_TRUE(quitAndWait()); + EXPECT_EQ(0, chunk_calls); +} + +TEST_F(AdminStreamingTest, CancelBeforeAskingForChunk2) { + AdminResponseSharedPtr response = streamingResponse(); + waitForHeaders(response); + int chunk_calls = 0; + interlockMainThread([&response, &chunk_calls]() { + response->nextChunk([&chunk_calls](Buffer::Instance&, bool) { ++chunk_calls; }); + response->cancel(); + }); + EXPECT_TRUE(quitAndWait()); + EXPECT_EQ(0, chunk_calls); +} + +TEST_F(AdminStreamingTest, CancelAfterAskingForChunk) { + AdminResponseSharedPtr response = streamingResponse(); + waitForHeaders(response); + int chunk_calls = 0; + + // Cause the /streaming handler to pause while yielding the next chunk, to hit + // an early exit in requestNextChunk. + next_chunk_hook_ = [response]() { response->cancel(); }; + + interlockMainThread([&chunk_calls, response]() { + response->nextChunk([&chunk_calls](Buffer::Instance&, bool) { ++chunk_calls; }); + }); + + EXPECT_TRUE(quitAndWait()); + EXPECT_EQ(0, chunk_calls); +} + +TEST_F(AdminStreamingTest, QuitBeforeHeaders) { + AdminResponseSharedPtr response = streamingResponse(); + EXPECT_TRUE(quitAndWait()); + ResponseData response_data = runStreamingRequest(response); + EXPECT_EQ(1, response_data.num_chunks_); + EXPECT_EQ(0, response_data.num_bytes_); + EXPECT_EQ(Http::Code::InternalServerError, response_data.code_); + EXPECT_EQ("text/plain; charset=UTF-8", response_data.content_type_); +} + +TEST_F(AdminStreamingTest, QuitDeleteRace1) { + AdminResponseSharedPtr response = streamingResponse(); + // Initiates a streaming quit on the main thread, but do not wait for it. + quitAndRequestHeaders(); + response.reset(); // Races with the quitquitquit + EXPECT_TRUE(waitForEnvoyToExit()); +} + +TEST_F(AdminStreamingTest, QuitDeleteRace2) { + AdminResponseSharedPtr response = streamingResponse(); + adminRequest("/quitquitquit", "POST"); + response.reset(); + EXPECT_TRUE(waitForEnvoyToExit()); +} + +TEST_F(AdminStreamingTest, QuitCancelRace) { + AdminResponseSharedPtr response = streamingResponse(); + quitAndRequestHeaders(); + response->cancel(); // Races with the quitquitquit + EXPECT_TRUE(waitForEnvoyToExit()); +} + +TEST_F(AdminStreamingTest, QuitBeforeCreatingResponse) { + // Initiates a streaming quit on the main thread, and wait for headers, which + // will trigger the termination of the event loop, and subsequent nulling of + // main_common_. However we can pause the test infrastructure after the quit + // takes hold leaving main_common_ in tact, to reproduce a potential race. + pause_after_run_ = true; + adminRequest("/quitquitquit", "POST"); + pause_point_.WaitForNotification(); // run() finished, but main_common_ still exists. + AdminResponseSharedPtr response = streamingResponse(); + ResponseData response_data = runStreamingRequest(response); + EXPECT_EQ(1, response_data.num_chunks_); + EXPECT_EQ(0, response_data.num_bytes_); + EXPECT_EQ(Http::Code::InternalServerError, response_data.code_); + EXPECT_EQ("text/plain; charset=UTF-8", response_data.content_type_); + resume_.Notify(); + EXPECT_TRUE(waitForEnvoyToExit()); + response.reset(); +} + +TEST_F(AdminStreamingTest, TimeoutGettingResponse) { + absl::Notification got_headers; + AdminResponseSharedPtr response = streamingResponse(); + + // Mimics a slow admin response by adding a blocking notification in front + // of a call to initiate an admin request. + main_common_->dispatcherForTest().post([this, response, &got_headers] { + resume_.WaitForNotification(); + response->getHeaders( + [&got_headers](Http::Code, Http::ResponseHeaderMap&) { got_headers.Notify(); }); + pause_point_.Notify(); + }); + + ENVOY_LOG_MISC(info, "Blocking for 5 seconds to test timeout functionality..."); + ASSERT_FALSE(got_headers.WaitForNotificationWithTimeout(absl::Seconds(5))); + resume_.Notify(); + pause_point_.WaitForNotification(); + EXPECT_TRUE(quitAndWait()); +} + +} // namespace Envoy diff --git a/test/exe/main_common_test.cc b/test/exe/main_common_test.cc index 68d142e5b3b8..942a53e3e3b1 100644 --- a/test/exe/main_common_test.cc +++ b/test/exe/main_common_test.cc @@ -1,6 +1,5 @@ #include "envoy/common/platform.h" -#include "source/common/common/lock_guard.h" #include "source/common/common/mutex_tracer_impl.h" #include "source/common/common/random_generator.h" #include "source/common/common/thread.h" @@ -9,6 +8,7 @@ #include "source/exe/platform_impl.h" #include "source/server/options_impl.h" +#include "test/exe/main_common_test_base.h" #include "test/mocks/common.h" #include "test/test_common/contention.h" #include "test/test_common/environment.h" @@ -52,34 +52,10 @@ const std::string& outOfMemoryPattern() { * an argv array that is terminated with nullptr. Identifies the config * file relative to runfiles directory. */ -class MainCommonTest : public testing::TestWithParam { +class MainCommonTest : public MainCommonTestBase, + public testing::TestWithParam { protected: - MainCommonTest() - : config_file_(TestEnvironment::temporaryFileSubstitute( - "test/config/integration/google_com_proxy_port_0.yaml", TestEnvironment::ParamMap(), - TestEnvironment::PortMap(), GetParam())), - argv_({"envoy-static", "--use-dynamic-base-id", "-c", config_file_.c_str(), nullptr}) {} - - const char* const* argv() { return &argv_[0]; } - int argc() { return argv_.size() - 1; } - - // Adds an argument, assuring that argv remains null-terminated. - void addArg(const char* arg) { - ASSERT(!argv_.empty()); - const size_t last = argv_.size() - 1; - ASSERT(argv_[last] == nullptr); // invariant established in ctor, maintained below. - argv_[last] = arg; // guaranteed non-empty - argv_.push_back(nullptr); - } - - // Adds options to make Envoy exit immediately after initialization. - void initOnly() { - addArg("--mode"); - addArg("init_only"); - } - - std::string config_file_; - std::vector argv_; + MainCommonTest() : MainCommonTestBase(GetParam()) {} }; INSTANTIATE_TEST_SUITE_P(IpVersions, MainCommonTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), @@ -223,6 +199,16 @@ TEST_P(MainCommonTest, RetryDynamicBaseIdFails) { #endif } +// Verifies that the Logger::Registry is usable after constructing and +// destructing MainCommon. +TEST_P(MainCommonTest, ConstructDestructLogger) { + VERBOSE_EXPECT_NO_THROW(MainCommon main_common(argc(), argv())); + + const std::string logger_name = "logger"; + spdlog::details::log_msg log_msg(logger_name, spdlog::level::level_enum::err, "error"); + Logger::Registry::getSink()->log(log_msg); +} + // Test that std::set_new_handler() was called and the callback functions as expected. // This test fails under TSAN and ASAN, so don't run it in that build: // [ DEATH ] ==845==ERROR: ThreadSanitizer: requested allocation size 0x3e800000000 @@ -268,96 +254,10 @@ TEST_P(MainCommonDeathTest, OutOfMemoryHandler) { #endif } -class AdminRequestTest : public MainCommonTest { +class AdminRequestTest : public AdminRequestTestBase, + public testing::TestWithParam { protected: - AdminRequestTest() { addArg("--disable-hot-restart"); } - - // Runs an admin request specified in path, blocking until completion, and - // returning the response body. - std::string adminRequest(absl::string_view path, absl::string_view method) { - absl::Notification done; - std::string out; - main_common_->adminRequest( - path, method, - [&done, &out](const Http::HeaderMap& /*response_headers*/, absl::string_view body) { - out = std::string(body); - done.Notify(); - }); - done.WaitForNotification(); - return out; - } - - // Initiates Envoy running in its own thread. - void startEnvoy() { - envoy_thread_ = Thread::threadFactoryForTest().createThread([this]() { - // Note: main_common_ is accessed in the testing thread, but - // is race-free, as MainCommon::run() does not return until - // triggered with an adminRequest POST to /quitquitquit, which - // is done in the testing thread. - main_common_ = std::make_unique(argc(), argv()); - envoy_started_ = true; - started_.Notify(); - pauseResumeInterlock(pause_before_run_); - bool status = main_common_->run(); - pauseResumeInterlock(pause_after_run_); - main_common_.reset(); - envoy_finished_ = true; - envoy_return_ = status; - finished_.Notify(); - }); - } - - // Conditionally pauses at a critical point in the Envoy thread, waiting for - // the test thread to trigger something at that exact line. The test thread - // can then call resume_.Notify() to allow the Envoy thread to resume. - void pauseResumeInterlock(bool enable) { - if (enable) { - pause_point_.Notify(); - resume_.WaitForNotification(); - } - } - - // Wait until Envoy is inside the main server run loop proper. Before entering, Envoy runs any - // pending post callbacks, so it's not reliable to use adminRequest() or post() to do this. - // Generally, tests should not depend on this for correctness, but as a result of - // https://github.com/libevent/libevent/issues/779 we need to for TSAN. This is because the entry - // to event_base_loop() is where the signal base race occurs, but once we're in that loop in - // blocking mode, we're safe to take signals. - // TODO(htuch): Remove when https://github.com/libevent/libevent/issues/779 is fixed. - void waitForEnvoyRun() { - absl::Notification done; - main_common_->dispatcherForTest().post([this, &done] { - struct Sacrifice : Event::DeferredDeletable { - Sacrifice(absl::Notification& notify) : notify_(notify) {} - ~Sacrifice() override { notify_.Notify(); } - absl::Notification& notify_; - }; - auto sacrifice = std::make_unique(done); - // Wait for a deferred delete cleanup, this only happens in the main server run loop. - main_common_->dispatcherForTest().deferredDelete(std::move(sacrifice)); - }); - done.WaitForNotification(); - } - - // Having triggered Envoy to quit (via signal or /quitquitquit), this blocks until Envoy exits. - bool waitForEnvoyToExit() { - finished_.WaitForNotification(); - envoy_thread_->join(); - return envoy_return_; - } - - Stats::IsolatedStoreImpl stats_store_; - std::unique_ptr envoy_thread_; - std::unique_ptr main_common_; - absl::Notification started_; - absl::Notification finished_; - absl::Notification resume_; - absl::Notification pause_point_; - bool envoy_return_{false}; - bool envoy_started_{false}; - bool envoy_finished_{false}; - bool pause_before_run_{false}; - bool pause_after_run_{false}; + AdminRequestTest() : AdminRequestTestBase(GetParam()) {} }; INSTANTIATE_TEST_SUITE_P(IpVersions, AdminRequestTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), @@ -367,8 +267,7 @@ TEST_P(AdminRequestTest, AdminRequestGetStatsAndQuit) { startEnvoy(); started_.WaitForNotification(); EXPECT_THAT(adminRequest("/stats", "GET"), HasSubstr("filesystem.reopen_failed")); - adminRequest("/quitquitquit", "POST"); - EXPECT_TRUE(waitForEnvoyToExit()); + quitAndWait(); } // no signals on Windows -- could probably make this work with GenerateConsoleCtrlEvent @@ -459,8 +358,7 @@ TEST_P(AdminRequestTest, AdminRequestBeforeRun) { // We don't get a notification when run(), so it's not safe to check whether the // admin handler is called until after we quit. - adminRequest("/quitquitquit", "POST"); - EXPECT_TRUE(waitForEnvoyToExit()); + quitAndWait(); EXPECT_TRUE(admin_handler_was_called); // This just checks that some stat output was reported. We could pick any stat. @@ -515,14 +413,4 @@ TEST_P(AdminRequestTest, AdminRequestAfterRun) { EXPECT_EQ(1, lambda_destroy_count); } -// Verifies that the Logger::Registry is usable after constructing and -// destructing MainCommon. -TEST_P(MainCommonTest, ConstructDestructLogger) { - VERBOSE_EXPECT_NO_THROW(MainCommon main_common(argc(), argv())); - - const std::string logger_name = "logger"; - spdlog::details::log_msg log_msg(logger_name, spdlog::level::level_enum::err, "error"); - Logger::Registry::getSink()->log(log_msg); -} - } // namespace Envoy diff --git a/test/exe/main_common_test_base.cc b/test/exe/main_common_test_base.cc new file mode 100644 index 000000000000..d0f3960b4148 --- /dev/null +++ b/test/exe/main_common_test_base.cc @@ -0,0 +1,117 @@ +#include "test/exe/main_common_test_base.h" + +#include "source/common/common/thread.h" + +#include "test/test_common/thread_factory_for_test.h" + +namespace Envoy { + +MainCommonTestBase::MainCommonTestBase(Network::Address::IpVersion version) + : config_file_(TestEnvironment::temporaryFileSubstitute( + "test/config/integration/google_com_proxy_port_0.yaml", TestEnvironment::ParamMap(), + TestEnvironment::PortMap(), version)), + argv_({"envoy-static", "--use-dynamic-base-id", "-c", config_file_.c_str(), nullptr}) {} + +const char* const* MainCommonTestBase::argv() { return &argv_[0]; } +int MainCommonTestBase::argc() { return argv_.size() - 1; } + +// Adds an argument, assuring that argv remains null-terminated. +void MainCommonTestBase::addArg(const char* arg) { + ASSERT(!argv_.empty()); + const size_t last = argv_.size() - 1; + ASSERT(argv_[last] == nullptr); // invariant established in ctor, maintained below. + argv_[last] = arg; // guaranteed non-empty + argv_.push_back(nullptr); +} + +// Adds options to make Envoy exit immediately after initialization. +void MainCommonTestBase::initOnly() { + addArg("--mode"); + addArg("init_only"); +} + +AdminRequestTestBase::AdminRequestTestBase(Network::Address::IpVersion version) + : MainCommonTestBase(version) { + addArg("--disable-hot-restart"); +} + +// Runs an admin request specified in path, blocking until completion, and +// returning the response body. +std::string AdminRequestTestBase::adminRequest(absl::string_view path, absl::string_view method) { + absl::Notification done; + std::string out; + main_common_->adminRequest( + path, method, + [&done, &out](const Http::HeaderMap& /*response_headers*/, absl::string_view body) { + out = std::string(body); + done.Notify(); + }); + done.WaitForNotification(); + return out; +} + +// Initiates Envoy running in its own thread. +void AdminRequestTestBase::startEnvoy() { + envoy_thread_ = Thread::threadFactoryForTest().createThread([this]() { + // Note: main_common_ is accessed in the testing thread, but + // is race-free, as MainCommon::run() does not return until + // triggered with an adminRequest POST to /quitquitquit, which + // is done in the testing thread. + main_common_ = std::make_unique(argc(), argv()); + envoy_started_ = true; + started_.Notify(); + pauseResumeInterlock(pause_before_run_); + bool status = main_common_->run(); + pauseResumeInterlock(pause_after_run_); + main_common_.reset(); + envoy_finished_ = true; + envoy_return_ = status; + finished_.Notify(); + }); +} + +// Conditionally pauses at a critical point in the Envoy thread, waiting for +// the test thread to trigger something at that exact line. The test thread +// can then call resume_.Notify() to allow the Envoy thread to resume. +void AdminRequestTestBase::pauseResumeInterlock(bool enable) { + if (enable) { + pause_point_.Notify(); + resume_.WaitForNotification(); + } +} + +// Wait until Envoy is inside the main server run loop proper. Before entering, Envoy runs any +// pending post callbacks, so it's not reliable to use adminRequest() or post() to do this. +// Generally, tests should not depend on this for correctness, but as a result of +// https://github.com/libevent/libevent/issues/779 we need to for TSAN. This is because the entry +// to event_base_loop() is where the signal base race occurs, but once we're in that loop in +// blocking mode, we're safe to take signals. +// TODO(htuch): Remove when https://github.com/libevent/libevent/issues/779 is fixed. +void AdminRequestTestBase::waitForEnvoyRun() { + absl::Notification done; + main_common_->dispatcherForTest().post([this, &done] { + struct Sacrifice : Event::DeferredDeletable { + Sacrifice(absl::Notification& notify) : notify_(notify) {} + ~Sacrifice() override { notify_.Notify(); } + absl::Notification& notify_; + }; + auto sacrifice = std::make_unique(done); + // Wait for a deferred delete cleanup, this only happens in the main server run loop. + main_common_->dispatcherForTest().deferredDelete(std::move(sacrifice)); + }); + done.WaitForNotification(); +} + +// Having triggered Envoy to quit (via signal or /quitquitquit), this blocks until Envoy exits. +bool AdminRequestTestBase::waitForEnvoyToExit() { + finished_.WaitForNotification(); + envoy_thread_->join(); + return envoy_return_; +} + +bool AdminRequestTestBase::quitAndWait() { + adminRequest("/quitquitquit", "POST"); + return waitForEnvoyToExit(); +} + +} // namespace Envoy diff --git a/test/exe/main_common_test_base.h b/test/exe/main_common_test_base.h new file mode 100644 index 000000000000..91ae895dd69f --- /dev/null +++ b/test/exe/main_common_test_base.h @@ -0,0 +1,77 @@ +#pragma once + +#include +#include + +#include "source/common/stats/isolated_store_impl.h" +#include "source/exe/main_common.h" + +#include "test/test_common/environment.h" + +#include "absl/synchronization/notification.h" + +namespace Envoy { + +class MainCommonTestBase { +protected: + MainCommonTestBase(Network::Address::IpVersion version); + const char* const* argv(); + int argc(); + + // Adds an argument, assuring that argv remains null-terminated. + void addArg(const char* arg); + + // Adds options to make Envoy exit immediately after initialization. + void initOnly(); + + std::string config_file_; + std::vector argv_; +}; + +class AdminRequestTestBase : public MainCommonTestBase { +protected: + AdminRequestTestBase(Network::Address::IpVersion version); + + // Runs an admin request specified in path, blocking until completion, and + // returning the response body. + std::string adminRequest(absl::string_view path, absl::string_view method); + + // Initiates Envoy running in its own thread. + void startEnvoy(); + + // Conditionally pauses at a critical point in the Envoy thread, waiting for + // the test thread to trigger something at that exact line. The test thread + // can then call resume_.Notify() to allow the Envoy thread to resume. + void pauseResumeInterlock(bool enable); + + // Wait until Envoy is inside the main server run loop proper. Before entering, Envoy runs any + // pending post callbacks, so it's not reliable to use adminRequest() or post() to do this. + // Generally, tests should not depend on this for correctness, but as a result of + // https://github.com/libevent/libevent/issues/779 we need to for TSAN. This is because the entry + // to event_base_loop() is where the signal base race occurs, but once we're in that loop in + // blocking mode, we're safe to take signals. + // TODO(htuch): Remove when https://github.com/libevent/libevent/issues/779 is fixed. + void waitForEnvoyRun(); + + // Having triggered Envoy to quit (via signal or /quitquitquit), this blocks until Envoy exits. + bool waitForEnvoyToExit(); + + // Sends a quit request to the server, and waits for Envoy to exit. Returns + // true if successful. + bool quitAndWait(); + + Stats::IsolatedStoreImpl stats_store_; + std::unique_ptr envoy_thread_; + std::unique_ptr main_common_; + absl::Notification started_; + absl::Notification finished_; + absl::Notification resume_; + absl::Notification pause_point_; + bool envoy_return_{false}; + bool envoy_started_{false}; + bool envoy_finished_{false}; + bool pause_before_run_{false}; + bool pause_after_run_{false}; +}; + +} // namespace Envoy diff --git a/test/integration/admin_html/test_server.cc b/test/integration/admin_html/test_server.cc index 4a53d7661bb9..d371d7b22108 100644 --- a/test/integration/admin_html/test_server.cc +++ b/test/integration/admin_html/test_server.cc @@ -14,7 +14,8 @@ namespace { * a query param but it could not be found. * * This test-server is only for testing; it potentially makes the - * entire file-system avail + * entire file-system available to HTTP clients, so this should not + * be used for production systems. */ Http::Code testCallback(Http::ResponseHeaderMap& response_headers, Buffer::Instance& response, Server::AdminStream& admin_stream) { diff --git a/test/mocks/server/admin.h b/test/mocks/server/admin.h index c7308d6a044e..a5be3ab379a0 100644 --- a/test/mocks/server/admin.h +++ b/test/mocks/server/admin.h @@ -40,6 +40,7 @@ class MockAdmin : public Admin { MOCK_METHOD(void, addListenerToHandler, (Network::ConnectionHandler * handler)); MOCK_METHOD(uint32_t, concurrency, (), (const)); MOCK_METHOD(void, closeSocket, ()); + MOCK_METHOD(RequestPtr, makeRequest, (AdminStream & admin_stream), (const)); NiceMock config_tracker_; NiceMock socket_; diff --git a/test/per_file_coverage.sh b/test/per_file_coverage.sh index cd8f977a61fb..8939f020ba04 100755 --- a/test/per_file_coverage.sh +++ b/test/per_file_coverage.sh @@ -22,7 +22,7 @@ declare -a KNOWN_LOW_COVERAGE=( "source/common/signal:87.2" # Death tests don't report LCOV "source/common/thread:0.0" # Death tests don't report LCOV "source/common/watchdog:58.6" # Death tests don't report LCOV -"source/exe:90.3" +"source/exe:94.0" # increased by #32346, need coverage for terminate_handler and hot restart failures "source/extensions/clusters/common:91.5" # This can be increased again once `#24903` lands "source/extensions/common:93.0" #flaky: be careful adjusting "source/extensions/common/proxy_protocol:93.8" # Adjusted for security patch diff --git a/test/server/admin/admin_filter_test.cc b/test/server/admin/admin_filter_test.cc index 9627cfd1ca31..fbf6c93cd72d 100644 --- a/test/server/admin/admin_filter_test.cc +++ b/test/server/admin/admin_filter_test.cc @@ -7,27 +7,28 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +using testing::ByMove; using testing::InSequence; using testing::NiceMock; +using testing::Return; namespace Envoy { namespace Server { class AdminFilterTest : public testing::TestWithParam { public: - AdminFilterTest() : filter_(adminHandlerCallback), request_headers_{{":path", "/"}} { + AdminFilterTest() : filter_(admin_), request_headers_{{":path", "/"}} { + EXPECT_CALL(admin_, makeRequest(_)).WillOnce(Return(ByMove(adminHandlerCallback()))); filter_.setDecoderFilterCallbacks(callbacks_); } - NiceMock server_; + NiceMock admin_; Stats::IsolatedStoreImpl listener_scope_; AdminFilter filter_; NiceMock callbacks_; Http::TestRequestHeaderMapImpl request_headers_; - static Admin::RequestPtr adminHandlerCallback(AdminStream& admin_stream) { - // silence compiler warnings for unused params - UNREFERENCED_PARAMETER(admin_stream); + static Admin::RequestPtr adminHandlerCallback() { return AdminImpl::makeStaticTextRequest("OK\n", Http::Code::OK); } }; diff --git a/test/server/admin/admin_instance.cc b/test/server/admin/admin_instance.cc index 7b18c448e18f..a2b4d2f15d17 100644 --- a/test/server/admin/admin_instance.cc +++ b/test/server/admin/admin_instance.cc @@ -10,7 +10,7 @@ namespace Server { AdminInstanceTest::AdminInstanceTest() : cpu_profile_path_(TestEnvironment::temporaryPath("envoy.prof")), admin_(cpu_profile_path_, server_, false), request_headers_{{":path", "/"}}, - admin_filter_(admin_.createRequestFunction()) { + admin_filter_(admin_) { std::list access_logs; Filesystem::FilePathAndType file_info{Filesystem::DestinationType::File, "/dev/null"}; access_logs.emplace_back(new Extensions::AccessLoggers::File::FileAccessLog( diff --git a/test/server/config_validation/BUILD b/test/server/config_validation/BUILD index 6d838b121c91..ed55171c3d66 100644 --- a/test/server/config_validation/BUILD +++ b/test/server/config_validation/BUILD @@ -59,6 +59,7 @@ envoy_cc_test( "//source/extensions/filters/network/http_connection_manager:config", "//source/extensions/listener_managers/validation_listener_manager:validation_listener_manager_lib", "//source/extensions/transport_sockets/tls:config", + "//source/server/admin:admin_filter_lib", "//source/server/config_validation:server_lib", "//test/integration:integration_lib", "//test/mocks/network:network_mocks", diff --git a/test/server/config_validation/server_test.cc b/test/server/config_validation/server_test.cc index 8102b84b01d4..6462503b2cc8 100644 --- a/test/server/config_validation/server_test.cc +++ b/test/server/config_validation/server_test.cc @@ -4,6 +4,7 @@ #include "envoy/server/filter_config.h" #include "source/extensions/listener_managers/validation_listener_manager/validation_listener_manager.h" +#include "source/server/admin/admin_filter.h" #include "source/server/config_validation/server.h" #include "source/server/process_context_impl.h" @@ -207,6 +208,8 @@ TEST_P(ValidationServerTest, DummyMethodsTest) { server.admin()->addListenerToHandler(nullptr); server.admin()->closeSocket(); server.admin()->startHttpListener({}, nullptr, nullptr); + AdminFilter filter(*server.admin()); + EXPECT_TRUE(server.admin()->makeRequest(filter) == nullptr); Network::MockTcpListenerCallbacks listener_callbacks; Network::MockListenerConfig listener_config;