Skip to content

Commit

Permalink
grpc_json_transcoder: Adhere to decoder and encoder buffer limits (#1…
Browse files Browse the repository at this point in the history
…4906)

Signed-off-by: Teju Nareddy <[email protected]>
  • Loading branch information
nareddyt authored Mar 18, 2021
1 parent 4f52f9e commit dde4eb6
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 5 deletions.
4 changes: 4 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ Minor Behavior Changes
logging, :ref:`auto_host_rewrite <envoy_api_field_route.RouteAction.auto_host_rewrite>`, etc.
Setting the hostname manually allows overriding the internal hostname used for such features while
still allowing the original DNS resolution name to be used.
* grpc_json_transcoder: filter now adheres to encoder and decoder buffer limits. Requests and responses
that require buffering over the limits will be directly rejected. The behavior can be reverted by
disabling runtime feature `envoy.reloadable_features.grpc_json_transcoder_adhere_to_buffer_limits`.
To reduce or increase the buffer limits the filter adheres to, reference the :ref:`flow control documentation <faq_flow_control>`.
* hds: support custom health check port via :ref:`health_check_config <envoy_v3_api_msg_config.endpoint.v3.endpoint.healthcheckconfig>`.
* healthcheck: the :ref:`health check filter <config_http_filters_health_check>` now sends the
:ref:`x-envoy-immediate-health-check-fail <config_http_filters_router_x-envoy-immediate-health-check-fail>` header
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ constexpr const char* runtime_features[] = {
"envoy.reloadable_features.dont_add_content_length_for_bodiless_requests",
"envoy.reloadable_features.enable_compression_without_content_length_header",
"envoy.reloadable_features.grpc_web_fix_non_proto_encoded_response_handling",
"envoy.reloadable_features.grpc_json_transcoder_adhere_to_buffer_limits",
"envoy.reloadable_features.hcm_stream_error_on_invalid_message",
"envoy.reloadable_features.health_check.graceful_goaway_handling",
"envoy.reloadable_features.health_check.immediate_failure_exclude_from_cluster",
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/grpc_json_transcoder/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ envoy_cc_library(
"//source/common/grpc:common_lib",
"//source/common/http:headers_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_features_lib",
"//source/extensions/filters/http:well_known_names",
"@com_google_googleapis//google/api:http_cc_proto",
"@envoy_api//envoy/extensions/filters/http/grpc_json_transcoder/v3:pkg_cc_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/http/utility.h"
#include "common/protobuf/protobuf.h"
#include "common/protobuf/utility.h"
#include "common/runtime/runtime_features.h"

#include "extensions/filters/http/grpc_json_transcoder/http_body_utils.h"
#include "extensions/filters/http/well_known_names.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ using RcDetails = ConstSingleton<RcDetailsValues>;

namespace {

constexpr absl::string_view buffer_limits_runtime_feature =
"envoy.reloadable_features.grpc_json_transcoder_adhere_to_buffer_limits";

const Http::LowerCaseString& trailerHeader() {
CONSTRUCT_ON_FIRST_USE(Http::LowerCaseString, "trailer");
}
Expand Down Expand Up @@ -536,6 +540,10 @@ Http::FilterDataStatus JsonTranscoderFilter::decodeData(Buffer::Instance& data,

if (method_->request_type_is_http_body_) {
request_data_.move(data);
if (decoderBufferLimitReached(request_data_.length())) {
return Http::FilterDataStatus::StopIterationNoBuffer;
}

// TODO(euroelessar): Upper bound message size for streaming case.
if (end_stream || method_->descriptor_->client_streaming()) {
maybeSendHttpBodyRequestMessage();
Expand All @@ -545,6 +553,9 @@ Http::FilterDataStatus JsonTranscoderFilter::decodeData(Buffer::Instance& data,
}
} else {
request_in_.move(data);
if (decoderBufferLimitReached(request_in_.bytesStored())) {
return Http::FilterDataStatus::StopIterationNoBuffer;
}

if (end_stream) {
request_in_.finish();
Expand Down Expand Up @@ -642,6 +653,9 @@ Http::FilterDataStatus JsonTranscoderFilter::encodeData(Buffer::Instance& data,
}

response_in_.move(data);
if (encoderBufferLimitReached(response_in_.bytesStored())) {
return Http::FilterDataStatus::StopIterationNoBuffer;
}

if (end_stream) {
response_in_.finish();
Expand Down Expand Up @@ -747,7 +761,6 @@ bool JsonTranscoderFilter::checkIfTranscoderFailed(const std::string& details) {
return false;
}

// TODO(lizan): Incorporate watermarks to bound buffer sizes
bool JsonTranscoderFilter::readToBuffer(Protobuf::io::ZeroCopyInputStream& stream,
Buffer::Instance& data) {
const void* out;
Expand Down Expand Up @@ -887,6 +900,50 @@ bool JsonTranscoderFilter::maybeConvertGrpcStatus(Grpc::Status::GrpcStatus grpc_
return true;
}

bool JsonTranscoderFilter::decoderBufferLimitReached(uint64_t buffer_length) {
if (!Runtime::runtimeFeatureEnabled(buffer_limits_runtime_feature)) {
return false;
}

if (buffer_length > decoder_callbacks_->decoderBufferLimit()) {
ENVOY_LOG(debug,
"Request rejected because the transcoder's internal buffer size exceeds the "
"configured limit: {} > {}",
buffer_length, decoder_callbacks_->decoderBufferLimit());
error_ = true;
decoder_callbacks_->sendLocalReply(
Http::Code::PayloadTooLarge,
"Request rejected because the transcoder's internal buffer size exceeds the configured "
"limit.",
nullptr, absl::nullopt,
absl::StrCat(RcDetails::get().GrpcTranscodeFailed, "{request_buffer_size_limit_reached}"));
return true;
}
return false;
}

bool JsonTranscoderFilter::encoderBufferLimitReached(uint64_t buffer_length) {
if (!Runtime::runtimeFeatureEnabled(buffer_limits_runtime_feature)) {
return false;
}

if (buffer_length > encoder_callbacks_->encoderBufferLimit()) {
ENVOY_LOG(debug,
"Response not transcoded because the transcoder's internal buffer size exceeds the "
"configured limit: {} > {}",
buffer_length, encoder_callbacks_->encoderBufferLimit());
error_ = true;
encoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError,
"Response not transcoded because the transcoder's internal buffer size exceeds the "
"configured limit.",
nullptr, absl::nullopt,
absl::StrCat(RcDetails::get().GrpcTranscodeFailed, "{response_buffer_size_limit_reached}"));
return true;
}
return false;
}

} // namespace GrpcJsonTranscoder
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ class JsonTranscoderFilter : public Http::StreamFilter, public Logger::Loggable<
void doTrailers(Http::ResponseHeaderOrTrailerMap& headers_or_trailers);
void initPerRouteConfig();

// Helpers for flow control.
bool decoderBufferLimitReached(uint64_t buffer_length);
bool encoderBufferLimitReached(uint64_t buffer_length);

JsonTranscoderConfig& config_;
const JsonTranscoderConfig* per_route_config_{};
std::unique_ptr<google::grpc::transcoding::Transcoder> transcoder_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ int64_t TranscoderInputStreamImpl::BytesAvailable() const { return buffer_->leng

bool TranscoderInputStreamImpl::Finished() const { return finished_; }

uint64_t TranscoderInputStreamImpl::bytesStored() const { return buffer_->length(); }

} // namespace GrpcJsonTranscoder
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class TranscoderInputStreamImpl : public Buffer::ZeroCopyInputStreamImpl,
// TranscoderInputStream
int64_t BytesAvailable() const override;
bool Finished() const override;

// Returns the total number of bytes stored in the underlying buffer.
// Useful for flow control.
uint64_t bytesStored() const;
};

} // namespace GrpcJsonTranscoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class GrpcJsonTranscoderIntegrationTest
const std::string& expected_response_body, bool full_response = true,
bool always_send_trailers = false,
const std::string expected_upstream_request_body = "",
bool expect_connection_to_upstream = true) {
bool expect_connection_to_upstream = true,
bool expect_response_complete = true) {
codec_client_ = makeHttpConnection(lookupPort("http"));

IntegrationStreamDecoderPtr response;
Expand Down Expand Up @@ -135,7 +136,7 @@ class GrpcJsonTranscoderIntegrationTest
}

response->waitForEndStream();
ASSERT_TRUE(response->complete());
EXPECT_EQ(response->complete(), expect_response_complete);

if (response->headers().get(Http::LowerCaseString("transfer-encoding")).empty() ||
!absl::StartsWith(response->headers()
Expand All @@ -152,8 +153,12 @@ class GrpcJsonTranscoderIntegrationTest
if (entry.value() == UnexpectedHeaderValue) {
EXPECT_TRUE(response->headers().get(lower_key).empty());
} else {
EXPECT_EQ(entry.value().getStringView(),
response->headers().get(lower_key)[0]->value().getStringView());
if (response->headers().get(lower_key).empty()) {
ADD_FAILURE() << "Header " << lower_key.get() << " not found.";
} else {
EXPECT_EQ(entry.value().getStringView(),
response->headers().get(lower_key)[0]->value().getStringView());
}
}
return Http::HeaderMap::Iterate::Continue;
});
Expand Down Expand Up @@ -1106,6 +1111,124 @@ TEST_P(GrpcJsonTranscoderIntegrationTest, EnableRequestValidationIgnoreQueryPara
"Could not resolve /unknown/path to a method.", true, false, "", false);
}

TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryPostRequestExceedsBufferLimit) {
// Request body is more than 8 bytes.
config_helper_.setBufferLimits(2 << 20, 8);
HttpIntegrationTest::initialize();

testTranscoding<bookstore::GetShelfRequest, bookstore::Shelf>(
Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/shelf"},
{":authority", "host"},
{"content-type", "application/json"}},
R"({"theme" : "Children"})", {}, {}, Status(),
Http::TestResponseHeaderMapImpl{{":status", "413"}},
"Request rejected because the transcoder's internal buffer size exceeds the configured "
"limit.",
true, false, "", true);
}

TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryPostResponseExceedsBufferLimit) {
// Request body is less than 35 bytes.
// Response body is more than 35 bytes.
config_helper_.setBufferLimits(2 << 20, 35);
HttpIntegrationTest::initialize();
testTranscoding<bookstore::CreateShelfRequest, bookstore::Shelf>(
Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/shelf"},
{":authority", "host"},
{"content-type", "application/json"}},
R"({"theme": "Children"})", {R"(shelf { theme: "Children" })"},
{R"(id: 20 theme: "Children 0123456789 0123456789 0123456789 0123456789" )"}, Status(),
Http::TestResponseHeaderMapImpl{
{":status", "500"}, {"content-type", "text/plain"}, {"content-length", "99"}},
"Response not transcoded because the transcoder's internal buffer size exceeds the "
"configured limit.");
}

TEST_P(GrpcJsonTranscoderIntegrationTest, UnaryPostHttpBodyRequestExceedsBufferLimit) {
// Request body is more than 8 bytes.
config_helper_.setBufferLimits(2 << 20, 8);
HttpIntegrationTest::initialize();

testTranscoding<google::api::HttpBody, google::api::HttpBody>(
Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/echoRawBody"},
{":authority", "host"},
{"content-type", "text/plain"}},
R"(hello world!)", {}, {}, Status(), Http::TestResponseHeaderMapImpl{{":status", "413"}},
"Request rejected because the transcoder's internal buffer size exceeds the configured "
"limit.",
true, false, "", true);
}

TEST_P(GrpcJsonTranscoderIntegrationTest, ServerStreamingGetExceedsBufferLimit) {
config_helper_.setBufferLimits(2 << 20, 60);
HttpIntegrationTest::initialize();

// Under limit: A single response message is less than 60 bytes.
// Messages transcoded successfully.
testTranscoding<bookstore::ListBooksRequest, bookstore::Book>(
Http::TestRequestHeaderMapImpl{
{":method", "GET"}, {":path", "/shelves/1/books"}, {":authority", "host"}},
"", {"shelf: 1"}, {R"(id: 1 author: "Neal Stephenson" title: "Readme")"}, Status(),
Http::TestResponseHeaderMapImpl{{":status", "200"}, {"content-type", "application/json"}},
R"([{"id":"1","author":"Neal Stephenson","title":"Readme"}])");

// Over limit: The server streams two response messages. Even through the transcoder
// handles them independently, portions of the first message are still in the
// internal buffers while the second one is processed.
//
// Because the headers and body is already sent, the stream is closed with
// an incomplete response.
testTranscoding<bookstore::ListBooksRequest, bookstore::Book>(
Http::TestRequestHeaderMapImpl{
{":method", "GET"}, {":path", "/shelves/1/books"}, {":authority", "host"}},
"", {"shelf: 1"},
{R"(id: 1 author: "Neal Stephenson" title: "Readme")",
R"(id: 2 author: "George R.R. Martin" title: "A Game of Thrones")"},
Status(),
Http::TestResponseHeaderMapImpl{{":status", "200"}, {"content-type", "application/json"}},
// Incomplete response, not valid JSON.
R"([{"id":"1","author":"Neal Stephenson","title":"Readme"})", true, false, "", true,
/*expect_response_complete=*/false);
}

TEST_P(GrpcJsonTranscoderIntegrationTest, ServerStreamingGetUnderBufferLimit) {
const int num_messages = 20;
config_helper_.setBufferLimits(2 << 20, 80);
HttpIntegrationTest::initialize();

// Craft multiple response messages. IF combined together, they exceed the buffer limit.
std::vector<std::string> grpc_response_messages;
grpc_response_messages.reserve(num_messages);
for (int i = 0; i < num_messages; i++) {
grpc_response_messages.push_back(R"(id: 1 author: "Neal Stephenson" title: "Readme")");
}

// Craft expected response.
std::vector<std::string> expected_json_messages;
expected_json_messages.reserve(num_messages);
for (int i = 0; i < num_messages; i++) {
expected_json_messages.push_back(R"({"id":"1","author":"Neal Stephenson","title":"Readme"})");
}
std::string expected_json_response =
absl::StrCat("[", absl::StrJoin(expected_json_messages, ","), "]");

// Under limit: Even though multiple messages are sent from the upstream, they are transcoded
// while streaming. The buffer limit is never hit. At most two messages are ever in the internal
// buffers. Transcoding succeeds.
testTranscoding<bookstore::ListBooksRequest, bookstore::Book>(
Http::TestRequestHeaderMapImpl{
{":method", "GET"}, {":path", "/shelves/1/books"}, {":authority", "host"}},
"", {"shelf: 1"}, grpc_response_messages, Status(),
Http::TestResponseHeaderMapImpl{{":status", "200"}, {"content-type", "application/json"}},
expected_json_response);
}

// TODO(nareddyt): Refactor testTranscoding and add a test case for client streaming under/over
// buffer limit. Will do in a separate PR to minimize diff.

TEST_P(GrpcJsonTranscoderIntegrationTest, RouteDisabled) {
overrideConfig(R"EOF({"services": [], "proto_descriptor_bin": ""})EOF");
HttpIntegrationTest::initialize();
Expand Down Expand Up @@ -1169,5 +1292,72 @@ TEST_P(OverrideConfigGrpcJsonTranscoderIntegrationTest, RouteOverride) {
R"({"shelves":[{"id":"20","theme":"Children"},{"id":"1","theme":"Foo"}]})");
};

// Tests to ensure transcoding buffer limits do not apply when the runtime feature is disabled.
class BufferLimitsDisabledGrpcJsonTranscoderIntegrationTest
: public GrpcJsonTranscoderIntegrationTest {
public:
void SetUp() override {
setUpstreamProtocol(FakeHttpConnection::Type::HTTP2);
const std::string filter =
R"EOF(
name: grpc_json_transcoder
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.grpc_json_transcoder.v3.GrpcJsonTranscoder
proto_descriptor : "{}"
services : "bookstore.Bookstore"
)EOF";
config_helper_.addFilter(
fmt::format(filter, TestEnvironment::runfilesPath("test/proto/bookstore.descriptor")));

// Disable runtime feature.
config_helper_.addRuntimeOverride(
"envoy.reloadable_features.grpc_json_transcoder_adhere_to_buffer_limits", "false");
}
};
INSTANTIATE_TEST_SUITE_P(IpVersions, BufferLimitsDisabledGrpcJsonTranscoderIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

TEST_P(BufferLimitsDisabledGrpcJsonTranscoderIntegrationTest, UnaryPostRequestExceedsBufferLimit) {
// Request body is more than 20 bytes.
config_helper_.setBufferLimits(2 << 20, 20);
HttpIntegrationTest::initialize();

// Transcoding succeeds.
testTranscoding<bookstore::CreateShelfRequest, bookstore::Shelf>(
Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/shelf"},
{":authority", "host"},
{"content-type", "application/json"}},
R"({"theme": "Children 0123456789 0123456789 0123456789 0123456789"})",
{R"(shelf { theme: "Children 0123456789 0123456789 0123456789 0123456789" })"}, {R"(id: 1)"},
Status(),
Http::TestResponseHeaderMapImpl{{":status", "200"},
{"content-type", "application/json"},
{"content-length", "10"},
{"grpc-status", "0"}},
R"({"id":"1"})");
}

TEST_P(BufferLimitsDisabledGrpcJsonTranscoderIntegrationTest, UnaryPostResponseExceedsBufferLimit) {
// Request body is less than 35 bytes.
// Response body is more than 35 bytes.
config_helper_.setBufferLimits(2 << 20, 35);
HttpIntegrationTest::initialize();

// Transcoding succeeds. However, the downstream client is unable to buffer the full response.
// We can tell these errors are NOT from the transcoder because the response body is too generic.
testTranscoding<bookstore::CreateShelfRequest, bookstore::Shelf>(
Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/shelf"},
{":authority", "host"},
{"content-type", "application/json"}},
R"({"theme": "Children"})", {R"(shelf { theme: "Children" })"},
{R"(id: 20 theme: "Children 0123456789 0123456789 0123456789 0123456789" )"}, Status(),
Http::TestResponseHeaderMapImpl{
{":status", "500"}, {"content-type", "text/plain"}, {"content-length", "21"}},
R"(Internal Server Error)");
}

} // namespace
} // namespace Envoy
Loading

0 comments on commit dde4eb6

Please sign in to comment.