Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin: add streaming variant of the admin API #32346

Merged
merged 44 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2cb2617
add streaming variant of the admin API
jmarantz Feb 13, 2024
d2412c6
remove superfluous diff
jmarantz Feb 13, 2024
48f114d
add streaming test
jmarantz Feb 13, 2024
d890eb3
add cancellation semantics and tests.
jmarantz Feb 18, 2024
b56ecb4
try to fix coverage and compiling without admin.
jmarantz Feb 18, 2024
aed8263
checkpoint
jmarantz Feb 19, 2024
3973c92
comment and cleanup
jmarantz Feb 19, 2024
9237ff9
format
jmarantz Feb 19, 2024
895e552
add asserts, refactor tests, and add quit/cancel race.
jmarantz Feb 19, 2024
4784a23
add cancel/quit race test
jmarantz Feb 19, 2024
d252659
tighten up logic and asserts.
jmarantz Feb 19, 2024
8ba301d
Cover a few missing lines.
jmarantz Feb 20, 2024
e6a10f3
format
jmarantz Feb 20, 2024
05c1127
hit one more early-exit line in main_common with a contrived test.
jmarantz Feb 20, 2024
c40e30a
improve consitency and handling of cancel function.
jmarantz Feb 20, 2024
8cfed12
add early exit in handleHeaders to force it to be tested, and refacto…
jmarantz Feb 20, 2024
ce30118
add comments.
jmarantz Feb 20, 2024
cd88721
cover more lines.
jmarantz Feb 21, 2024
4262d8d
tighten up coverage
jmarantz Feb 21, 2024
699f357
Merge branch 'main' into admin-streaming-c++-api
jmarantz Feb 21, 2024
47a51ca
mutex-protect destructor accesss to AdminResponse::terminated_. Make …
jmarantz Feb 22, 2024
f6edee6
add a TerminateNotifier to provide a context with appropriate lifetim…
jmarantz Feb 23, 2024
eda6834
remove superfluous function, fix compile-time issue with admin disabled.
jmarantz Feb 23, 2024
7239d03
add more tests and cleanup
jmarantz Feb 23, 2024
7fe67f3
fix initialization and expected results.
jmarantz Feb 23, 2024
7a433d1
Merge branch 'main' into admin-streaming-c++-api
jmarantz Feb 26, 2024
aa70f33
Split out AdminContext into its own source, hdr, test.
jmarantz Feb 27, 2024
a6d49b1
remove no-longer-needed 'friend' declarations and rename TerminateNot…
jmarantz Feb 27, 2024
153b2a2
format
jmarantz Feb 27, 2024
36195fe
cleanup & format
jmarantz Feb 27, 2024
4fb7898
fix admin-disabled build
jmarantz Feb 27, 2024
0cf32c1
address some review comments (others remain)
jmarantz Feb 28, 2024
03ce2c7
review comments
jmarantz Feb 29, 2024
48ece2d
remove ifdef'd out test helper.
jmarantz Feb 29, 2024
b006d79
add reference to FSM drawing
jmarantz Feb 29, 2024
f799b84
grammar
jmarantz Feb 29, 2024
e1640d3
fix race.
jmarantz Feb 29, 2024
d957b58
Merge branch 'main' into admin-streaming-c++-api
jmarantz Mar 1, 2024
06df4ec
Merge branch 'main' into admin-streaming-c++-api
jmarantz Mar 6, 2024
0e6a275
Better interlock for test infrastructure, hopefully still hitting all…
jmarantz Mar 6, 2024
23caf5e
remove run_before_resume hack.
jmarantz Mar 7, 2024
2427170
clean up
jmarantz Mar 7, 2024
bd8db80
Remove half-baked README.md, and add Tianyu's note to PtrSet comment.
jmarantz Mar 7, 2024
4fa21ea
Merge branch 'main' into admin-streaming-c++-api
jmarantz Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions envoy/server/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ class Admin {
* Closes the listening socket for the admin.
*/
virtual void closeSocket() PURE;

/**
* Creates a streaming request context from the url path in the admin stream.
*/
virtual RequestPtr makeRequest(AdminStream& admin_stream) const PURE;
};

} // namespace Server
Expand Down
16 changes: 15 additions & 1 deletion source/exe/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ load(
"envoy_cc_posix_without_linux_library",
"envoy_cc_win32_library",
"envoy_package",
"envoy_select_admin_functionality",
"envoy_select_enable_http3",
"envoy_select_signal_trace",
)
Expand Down Expand Up @@ -102,7 +103,7 @@ envoy_cc_library(
hdrs = [
"main_common.h",
],
deps = [
deps = envoy_select_admin_functionality([":admin_response_lib"]) + [
":platform_impl_lib",
":process_wide_lib",
":stripped_main_base_lib",
Expand All @@ -118,6 +119,19 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "admin_response_lib",
srcs = ["admin_response.cc"],
hdrs = ["admin_response.h"],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/http:header_map_lib",
"//source/server:server_lib",
"//source/server/admin:admin_lib",
"//source/server/admin:utils_lib",
],
)

envoy_cc_library(
name = "main_common_with_all_extensions_lib",
deps = [
Expand Down
191 changes: 191 additions & 0 deletions source/exe/admin_response.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#include "source/exe/admin_response.h"

#include "envoy/server/admin.h"

#include "source/server/admin/admin_filter.h"
#include "source/server/admin/utils.h"

namespace Envoy {

AdminResponse::AdminResponse(Server::Instance& server, absl::string_view path,
absl::string_view method, SharedPtrSet response_set)
: server_(server), opt_admin_(server.admin()), shared_response_set_(response_set) {
request_headers_->setMethod(method);
request_headers_->setPath(path);
}

AdminResponse::~AdminResponse() {
cancel();
shared_response_set_->detachResponse(this);
jmarantz marked this conversation as resolved.
Show resolved Hide resolved
}

void AdminResponse::getHeaders(HeadersFn fn) {
auto request_headers = [response = shared_from_this()]() { response->requestHeaders(); };

// First check for cancelling or termination.
{
absl::MutexLock lock(&mutex_);
ASSERT(headers_fn_ == nullptr);
if (cancelled_) {
return;
}
headers_fn_ = fn;
if (terminated_ || !opt_admin_) {
sendErrorLockHeld();
return;
}
}
server_.dispatcher().post(request_headers);
}

void AdminResponse::nextChunk(BodyFn fn) {
auto request_next_chunk = [response = shared_from_this()]() { response->requestNextChunk(); };

// Note the caller may race a call to nextChunk with the server being
// terminated.
{
absl::MutexLock lock(&mutex_);
ASSERT(body_fn_ == nullptr);
if (cancelled_) {
return;
}
body_fn_ = fn;
if (terminated_ || !opt_admin_) {
sendAbortChunkLockHeld();
return;
}
}

// Note that nextChunk may be called from any thread -- it's the callers choice,
// including the Envoy main thread, which would occur if the caller initiates
// the request of a chunk upon receipt of the previous chunk.
//
// In that case it may race against the AdminResponse object being deleted,
// in which case the callbacks, held in a shared_ptr, will be cancelled
// from the destructor. If that happens *before* we post to the main thread,
// we will just skip and never call fn.
server_.dispatcher().post(request_next_chunk);
}

// Called by the user if it is not longer interested in the result of the
// admin request. After calling cancel() the caller must not call nextChunk or
// getHeaders.
void AdminResponse::cancel() {
absl::MutexLock lock(&mutex_);
cancelled_ = true;
headers_fn_ = nullptr;
body_fn_ = nullptr;
}

bool AdminResponse::cancelled() const {
absl::MutexLock lock(&mutex_);
return cancelled_;
}

// Called from terminateAdminRequests when the Envoy server
// terminates. After this is called, the caller may need to complete the
// admin response, and so calls to getHeader and nextChunk remain valid,
// resulting in 503 and an empty body.
void AdminResponse::terminate() {
ASSERT_IS_MAIN_OR_TEST_THREAD();
absl::MutexLock lock(&mutex_);
if (!terminated_) {
terminated_ = true;
sendErrorLockHeld();
sendAbortChunkLockHeld();
}
}

void AdminResponse::requestHeaders() {
ASSERT_IS_MAIN_OR_TEST_THREAD();
{
absl::MutexLock lock(&mutex_);
if (cancelled_ || terminated_) {
return;
}
}
Server::AdminFilter filter(*opt_admin_);
filter.decodeHeaders(*request_headers_, false);
request_ = opt_admin_->makeRequest(filter);
code_ = request_->start(*response_headers_);
{
absl::MutexLock lock(&mutex_);
if (headers_fn_ == nullptr || cancelled_) {
return;
}
Server::Utility::populateFallbackResponseHeaders(code_, *response_headers_);
headers_fn_(code_, *response_headers_);
headers_fn_ = nullptr;
}
}

void AdminResponse::requestNextChunk() {
jmarantz marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_IS_MAIN_OR_TEST_THREAD();
{
absl::MutexLock lock(&mutex_);
if (cancelled_ || terminated_ || !more_data_) {
return;
}
}
ASSERT(response_.length() == 0);
more_data_ = request_->nextChunk(response_);
{
absl::MutexLock lock(&mutex_);
if (sent_end_stream_ || cancelled_) {
return;
}
sent_end_stream_ = !more_data_;
body_fn_(response_, more_data_);
ASSERT(response_.length() == 0);
body_fn_ = nullptr;
}
}

void AdminResponse::sendAbortChunkLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
if (!sent_end_stream_ && body_fn_ != nullptr) {
response_.drain(response_.length());
body_fn_(response_, false);
sent_end_stream_ = true;
}
body_fn_ = nullptr;
}

void AdminResponse::sendErrorLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
if (headers_fn_ != nullptr) {
code_ = Http::Code::InternalServerError;
Server::Utility::populateFallbackResponseHeaders(code_, *response_headers_);
headers_fn_(code_, *response_headers_);
headers_fn_ = nullptr;
}
}

void AdminResponse::PtrSet::terminateAdminRequests() {
ASSERT_IS_MAIN_OR_TEST_THREAD();

absl::MutexLock lock(&mutex_);
accepting_admin_requests_ = false;
for (AdminResponse* response : response_set_) {
// Consider the possibility of response being deleted due to its creator
// dropping its last reference right here. From its destructor it will call
// detachResponse(), which is mutex-ed against this loop, so before the
// memory becomes invalid, the call to terminate will complete.
response->terminate();
}
response_set_.clear();
}

void AdminResponse::PtrSet::attachResponse(AdminResponse* response) {
absl::MutexLock lock(&mutex_);
if (accepting_admin_requests_) {
response_set_.insert(response);
} else {
response->terminate();
}
}

void AdminResponse::PtrSet::detachResponse(AdminResponse* response) {
absl::MutexLock lock(&mutex_);
response_set_.erase(response);
}

} // namespace Envoy
Loading
Loading