Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin: apply flow control via high/low watermarks #31087

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions source/server/admin/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,13 @@ class RequestGasket : public Admin::Request {
}

bool nextChunk(Buffer::Instance& response) override {
response.move(response_);
return false;
// Artificially make 1M chunks from the buffered admin output, to see if we
// can observe chunking in a live server.
std::string str = response_.toString();
absl::string_view head(str.data(), std::min(size_t(1000000), str.size()));
response.add(head);
response_.drain(head.size());
return response_.length() > 0;
}

private:
Expand Down
49 changes: 33 additions & 16 deletions source/server/admin/admin_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
namespace Envoy {
namespace Server {

AdminFilter::AdminFilter(Admin::GenRequestFn admin_handler_fn)
: admin_handler_fn_(admin_handler_fn) {}
AdminFilter::AdminFilter(Admin::GenRequestFn admin_request_fn)
: admin_request_fn_(admin_request_fn) {}

Http::FilterHeadersStatus AdminFilter::decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) {
// ENVOY_LOG_MISC(error, "decodeHeaders({})", end_stream);
request_headers_ = &headers;
if (end_stream) {
onComplete();
Expand Down Expand Up @@ -82,28 +83,44 @@ Http::Utility::QueryParamsMulti AdminFilter::queryParams() const {
}

void AdminFilter::onComplete() {
absl::string_view path = request_headers_->getPathValue();
decoder_callbacks_->addDownstreamWatermarkCallbacks(watermark_callbacks_);

const absl::string_view path = request_headers_->getPathValue();
ENVOY_STREAM_LOG(debug, "request complete: path: {}", *decoder_callbacks_, path);

auto header_map = Http::ResponseHeaderMapImpl::create();
RELEASE_ASSERT(request_headers_, "");
Admin::RequestPtr handler = admin_handler_fn_(*this);
Http::Code code = handler->start(*header_map);
request_ = admin_request_fn_(*this);
Http::Code code = request_->start(*header_map);
Utility::populateFallbackResponseHeaders(code, *header_map);
decoder_callbacks_->encodeHeaders(std::move(header_map), false,
StreamInfo::ResponseCodeDetails::get().AdminFilterResponse);
nextChunk();
}

bool more_data;
do {
Buffer::OwnedImpl response;
more_data = handler->nextChunk(response);
bool end_stream = end_stream_on_complete_ && !more_data;
ENVOY_LOG_MISC(debug, "nextChunk: response.length={} more_data={} end_stream={}",
response.length(), more_data, end_stream);
if (response.length() > 0 || end_stream) {
decoder_callbacks_->encodeData(response, end_stream);
}
} while (more_data);
void AdminFilter::nextChunk() {
if (!can_write_ || request_ == nullptr) {
ENVOY_LOG_MISC(error, "nextChunk exit early");
return;
}

Buffer::OwnedImpl response;
bool more_data = request_->nextChunk(response);
bool end_stream = end_stream_on_complete_ && !more_data;
if (response.length() > 0 || end_stream) {
// ENVOY_LOG_MISC(error, "nextChunk sent data {}", response.length());
decoder_callbacks_->encodeData(response, end_stream);
} else {
// ENVOY_LOG_MISC(error, "nextChunk no data");
}

if (more_data) {
ENVOY_LOG_MISC(error, "nextChunk posting next");
decoder_callbacks_->dispatcher().post([this]() { nextChunk(); });
} else {
ENVOY_LOG_MISC(error, "nextChunk reset");
request_.reset();
}
}

} // namespace Server
Expand Down
33 changes: 30 additions & 3 deletions source/server/admin/admin_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,29 @@ class AdminFilter : public Http::PassThroughFilter,
public AdminStream,
Logger::Loggable<Logger::Id::admin> {
public:
using AdminServerCallbackFunction = std::function<Http::Code(
using AdminServerCallbackFunction = std::function<Admin::RequestPtr(
absl::string_view path_and_query, Http::ResponseHeaderMap& response_headers,
Buffer::OwnedImpl& response, AdminFilter& filter)>;

AdminFilter(Admin::GenRequestFn admin_handler_func);
class WatermarkCallbacks : public Http::DownstreamWatermarkCallbacks {
public:
explicit WatermarkCallbacks(AdminFilter& filter) : filter_(filter) {}
~WatermarkCallbacks() override = default;
void onAboveWriteBufferHighWatermark() override {
ENVOY_LOG_MISC(error, "Above high-watermark callback");
filter_.can_write_ = false;
}
void onBelowWriteBufferLowWatermark() override {
filter_.can_write_ = true;
ENVOY_LOG_MISC(error, "Below high-watermark callback");
filter_.nextChunk();
}

private:
AdminFilter& filter_;
};

explicit AdminFilter(Admin::GenRequestFn admin_request_func);

// Http::StreamFilterBase
// Handlers relying on the reference should use addOnDestroyCallback()
Expand Down Expand Up @@ -58,10 +76,19 @@ class AdminFilter : public Http::PassThroughFilter,
* Called when an admin request has been completely received.
*/
void onComplete();
Admin::GenRequestFn admin_handler_fn_;

/**
* Called when the system is ready for admin to write the next chunk.
*/
void nextChunk();

Admin::GenRequestFn admin_request_fn_;
Http::RequestHeaderMap* request_headers_{};
std::list<std::function<void()>> on_destroy_callbacks_;
bool end_stream_on_complete_ = true;
Admin::RequestPtr request_;
bool can_write_{true};
WatermarkCallbacks watermark_callbacks_{*this};
};

} // namespace Server
Expand Down
Loading