diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 26403db2dd1f..79698905729b 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -152,7 +152,17 @@ void AsyncStreamImpl::onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_s void AsyncStreamImpl::onData(Buffer::Instance& data, bool end_stream) { decoded_frames_.clear(); - if (!decoder_.decode(data, decoded_frames_)) { + auto status = decoder_.decode(data, decoded_frames_); + + // decode() currently only returns two types of error: + // - decoding error is mapped to ResourceExhausted + // - over-limit error is mapped to Internal. + // Other potential errors in the future are mapped to internal for now. + if (status.code() == absl::StatusCode::kResourceExhausted) { + streamError(Status::WellKnownGrpcStatus::ResourceExhausted); + return; + } + if (status.code() != absl::StatusCode::kOk) { streamError(Status::WellKnownGrpcStatus::Internal); return; } diff --git a/source/common/grpc/codec.cc b/source/common/grpc/codec.cc index eb24b4918407..1eceba951b81 100644 --- a/source/common/grpc/codec.cc +++ b/source/common/grpc/codec.cc @@ -31,7 +31,7 @@ void Encoder::prependFrameHeader(uint8_t flags, Buffer::Instance& buffer, uint32 buffer.prepend(frame_buffer); } -bool Decoder::decode(Buffer::Instance& input, std::vector& output) { +absl::Status Decoder::decode(Buffer::Instance& input, std::vector& output) { // Make sure those flags are set to initial state. decoding_error_ = false; is_frame_oversized_ = false; @@ -39,12 +39,16 @@ bool Decoder::decode(Buffer::Instance& input, std::vector& output) { inspect(input); output_ = nullptr; - if (decoding_error_ || is_frame_oversized_) { - return false; + if (decoding_error_) { + return absl::InternalError("Grpc decoding error"); + } + + if (is_frame_oversized_) { + return absl::ResourceExhaustedError("Grpc frame length exceeds limit"); } input.drain(input.length()); - return true; + return absl::OkStatus(); } bool Decoder::frameStart(uint8_t flags) { diff --git a/source/common/grpc/codec.h b/source/common/grpc/codec.h index 3b6692c96a97..192ed22ec07d 100644 --- a/source/common/grpc/codec.h +++ b/source/common/grpc/codec.h @@ -128,8 +128,8 @@ class Decoder : public FrameInspector { // error happened, the input buffer remains unchanged. // @param input supplies the binary octets wrapped in a GRPC data frame. // @param output supplies the buffer to store the decoded data. - // @return bool whether the decoding succeeded or not. - bool decode(Buffer::Instance& input, std::vector& output); + // @return absl::status whether the decoding succeeded or not. + absl::Status decode(Buffer::Instance& input, std::vector& output); // Determine the length of the current frame being decoded. This is useful when supplying a // partial frame to decode() and wanting to know how many more bytes need to be read to complete diff --git a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc index 15aaeb4a0f31..13187e30675e 100644 --- a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc +++ b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc @@ -883,7 +883,7 @@ void JsonTranscoderFilter::maybeSendHttpBodyRequestMessage(Buffer::Instance* dat bool JsonTranscoderFilter::buildResponseFromHttpBodyOutput( Http::ResponseHeaderMap& response_headers, Buffer::Instance& data) { std::vector frames; - decoder_.decode(data, frames); + std::ignore = decoder_.decode(data, frames); if (frames.empty()) { return false; } diff --git a/source/extensions/filters/http/grpc_web/grpc_web_filter.cc b/source/extensions/filters/http/grpc_web/grpc_web_filter.cc index 734fe8cf12cc..1d4dfca44184 100644 --- a/source/extensions/filters/http/grpc_web/grpc_web_filter.cc +++ b/source/extensions/filters/http/grpc_web/grpc_web_filter.cc @@ -300,7 +300,7 @@ Http::FilterDataStatus GrpcWebFilter::encodeData(Buffer::Instance& data, bool en // The decoder always consumes and drains the given buffer. Incomplete data frame is buffered // inside the decoder. std::vector frames; - decoder_.decode(data, frames); + std::ignore = decoder_.decode(data, frames); if (frames.empty()) { // We don't have enough data to decode for one single frame, stop iteration until more data // comes in. diff --git a/source/extensions/health_checkers/grpc/health_checker_impl.cc b/source/extensions/health_checkers/grpc/health_checker_impl.cc index ee9b4a047ae8..982793b7c77c 100644 --- a/source/extensions/health_checkers/grpc/health_checker_impl.cc +++ b/source/extensions/health_checkers/grpc/health_checker_impl.cc @@ -135,7 +135,7 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeData(Buffer::Ins } // We should end up with only one frame here. std::vector decoded_frames; - if (!decoder_.decode(data, decoded_frames)) { + if (!decoder_.decode(data, decoded_frames).ok()) { onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "gRPC wire protocol decode error", false); return; diff --git a/test/common/grpc/codec_fuzz_test.cc b/test/common/grpc/codec_fuzz_test.cc index 1d75425e7bbc..bd70ef9a26b5 100644 --- a/test/common/grpc/codec_fuzz_test.cc +++ b/test/common/grpc/codec_fuzz_test.cc @@ -59,7 +59,7 @@ DEFINE_FUZZER(const uint8_t* buf, size_t len) { : provider.ConsumeIntegralInRange(0, wire_buffer.length()); Buffer::OwnedImpl decode_buffer; decode_buffer.move(wire_buffer, decode_length); - const bool decode_result = decoder.decode(decode_buffer, frames); + const absl::Status decode_result = decoder.decode(decode_buffer, frames); // If we have recovered the original frames, we're decoding garbage. It // might end up being a valid frame, but there is no predictability, so just // drain and move on. If we haven't recovered the original frames, we @@ -67,7 +67,7 @@ DEFINE_FUZZER(const uint8_t* buf, size_t len) { if (frames.size() >= num_encode_frames) { decode_buffer.drain(decode_buffer.length()); } else { - FUZZ_ASSERT(decode_result); + FUZZ_ASSERT(decode_result.ok()); FUZZ_ASSERT(decode_buffer.length() == 0); } } diff --git a/test/common/grpc/codec_test.cc b/test/common/grpc/codec_test.cc index ce722bdd5c09..e4fdd165b967 100644 --- a/test/common/grpc/codec_test.cc +++ b/test/common/grpc/codec_test.cc @@ -70,14 +70,14 @@ TEST(GrpcCodecTest, decodeIncompleteFrame) { std::vector frames; Decoder decoder; - EXPECT_TRUE(decoder.decode(buffer, frames)); + EXPECT_TRUE(decoder.decode(buffer, frames).ok()); EXPECT_EQ(static_cast(0), buffer.length()); EXPECT_EQ(static_cast(0), frames.size()); EXPECT_EQ(static_cast(request.ByteSize()), decoder.length()); EXPECT_EQ(true, decoder.hasBufferedData()); buffer.add(request_buffer.c_str() + 5); - EXPECT_TRUE(decoder.decode(buffer, frames)); + EXPECT_TRUE(decoder.decode(buffer, frames).ok()); EXPECT_EQ(static_cast(0), buffer.length()); EXPECT_EQ(static_cast(1), frames.size()); EXPECT_EQ(static_cast(0), decoder.length()); @@ -102,7 +102,7 @@ TEST(GrpcCodecTest, decodeInvalidFrame) { std::vector frames; Decoder decoder; - EXPECT_FALSE(decoder.decode(buffer, frames)); + EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kInternal); EXPECT_EQ(size, buffer.length()); } @@ -117,7 +117,7 @@ TEST(GrpcCodecTest, DecodeMultipleFramesInvalid) { std::vector frames; Decoder decoder; - EXPECT_FALSE(decoder.decode(buffer, frames)); + EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kInternal); // When the decoder doesn't successfully decode, it puts decoded frames up until // an invalid frame into output frame vector. EXPECT_EQ(1, frames.size()); @@ -146,7 +146,7 @@ TEST(GrpcCodecTest, DecodeValidFrameWithInvalidFrameAfterward) { std::vector frames; Decoder decoder; - EXPECT_FALSE(decoder.decode(buffer, frames)); + EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kInternal); // When the decoder doesn't successfully decode, it puts valid frames up until // an invalid frame into output frame vector. EXPECT_EQ(1, frames.size()); @@ -162,7 +162,7 @@ TEST(GrpcCodecTest, decodeEmptyFrame) { Decoder decoder; std::vector frames; - EXPECT_TRUE(decoder.decode(buffer, frames)); + EXPECT_TRUE(decoder.decode(buffer, frames).ok()); EXPECT_EQ(1, frames.size()); EXPECT_EQ(0, frames[0].length_); @@ -181,7 +181,7 @@ TEST(GrpcCodecTest, decodeSingleFrame) { std::vector frames; Decoder decoder; - EXPECT_TRUE(decoder.decode(buffer, frames)); + EXPECT_TRUE(decoder.decode(buffer, frames).ok()); EXPECT_EQ(static_cast(0), buffer.length()); EXPECT_EQ(frames.size(), static_cast(1)); EXPECT_EQ(GRPC_FH_DEFAULT, frames[0].flags_); @@ -208,7 +208,7 @@ TEST(GrpcCodecTest, decodeMultipleFrame) { std::vector frames; Decoder decoder; - EXPECT_TRUE(decoder.decode(buffer, frames)); + EXPECT_TRUE(decoder.decode(buffer, frames).ok()); EXPECT_EQ(static_cast(0), buffer.length()); EXPECT_EQ(frames.size(), static_cast(1009)); for (Frame& frame : frames) { @@ -240,7 +240,7 @@ TEST(GrpcCodecTest, decodeSingleFrameOverLimit) { decoder.setMaxFrameLength(32 * 1024); // The decoder doesn't successfully decode due to oversized frame. - EXPECT_FALSE(decoder.decode(buffer, frames)); + EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kResourceExhausted); EXPECT_EQ(buffer.length(), size); } @@ -275,7 +275,7 @@ TEST(GrpcCodecTest, decodeSingleFrameWithMultiBuffersOverLimit) { // Both decoding attempts failed due to the total frame size exceeding the limit. for (uint32_t i = 0; i < buffers.size(); ++i) { - EXPECT_FALSE(decoder.decode(buffers[i], frames)); + EXPECT_EQ(decoder.decode(buffers[i], frames).code(), absl::StatusCode::kResourceExhausted); } EXPECT_EQ(frames.size(), 0); @@ -309,7 +309,7 @@ TEST(GrpcCodecTest, decodeMultipleFramesOverLimit) { Decoder decoder; decoder.setMaxFrameLength(32 * 1024); - EXPECT_FALSE(decoder.decode(buffer, frames)); + EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kResourceExhausted); // When the decoder doesn't successfully decode, it puts valid frames up until // an oversized frame into output frame vector. ASSERT_EQ(frames.size(), 1); diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index 7e1799ae5442..4ddab1b2bc27 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -5139,7 +5139,7 @@ class GrpcHealthCheckerImplTestBase : public Event::TestUsingSimulatedTime, .WillOnce(Invoke([&](Buffer::Instance& data, bool) { std::vector decoded_frames; Grpc::Decoder decoder; - ASSERT_TRUE(decoder.decode(data, decoded_frames)); + ASSERT_TRUE(decoder.decode(data, decoded_frames).ok()); ASSERT_EQ(1U, decoded_frames.size()); auto& frame = decoded_frames[0]; Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_)); @@ -5323,7 +5323,7 @@ TEST_F(GrpcHealthCheckerImplTest, SuccessWithAdditionalHeaders) { .WillOnce(Invoke([&](Buffer::Instance& data, bool) { std::vector decoded_frames; Grpc::Decoder decoder; - ASSERT_TRUE(decoder.decode(data, decoded_frames)); + ASSERT_TRUE(decoder.decode(data, decoded_frames).ok()); ASSERT_EQ(1U, decoded_frames.size()); auto& frame = decoded_frames[0]; Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_)); diff --git a/test/extensions/filters/http/grpc_field_extraction/message_converter/message_converter_test_lib.h b/test/extensions/filters/http/grpc_field_extraction/message_converter/message_converter_test_lib.h index b451e3a3fc02..3703b489d03d 100644 --- a/test/extensions/filters/http/grpc_field_extraction/message_converter/message_converter_test_lib.h +++ b/test/extensions/filters/http/grpc_field_extraction/message_converter/message_converter_test_lib.h @@ -25,7 +25,7 @@ void checkSerializedData(Envoy::Buffer::Instance& data, std::vector expected_requests) { ::Envoy::Grpc::Decoder grpc_decoder; std::vector<::Envoy::Grpc::Frame> frames_after_processing; - ASSERT_TRUE(grpc_decoder.decode(data, frames_after_processing)); + ASSERT_TRUE(grpc_decoder.decode(data, frames_after_processing).ok()); ASSERT_EQ(expected_requests.size(), frames_after_processing.size()); for (unsigned long i = 0; i < frames_after_processing.size(); i++) { diff --git a/test/extensions/filters/http/grpc_http1_reverse_bridge/reverse_bridge_test.cc b/test/extensions/filters/http/grpc_http1_reverse_bridge/reverse_bridge_test.cc index 8c6609dbf17f..f3b5f36c90b1 100644 --- a/test/extensions/filters/http/grpc_http1_reverse_bridge/reverse_bridge_test.cc +++ b/test/extensions/filters/http/grpc_http1_reverse_bridge/reverse_bridge_test.cc @@ -293,7 +293,7 @@ TEST_F(ReverseBridgeTest, GrpcRequest) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(buffer, frames); + std::ignore = decoder.decode(buffer, frames); EXPECT_EQ(1, frames.size()); EXPECT_EQ(12, frames[0].length_); @@ -376,7 +376,7 @@ TEST_F(ReverseBridgeTest, GrpcRequestNoContentLength) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(buffer, frames); + std::ignore = decoder.decode(buffer, frames); EXPECT_EQ(1, frames.size()); EXPECT_EQ(12, frames[0].length_); @@ -509,7 +509,7 @@ TEST_F(ReverseBridgeTest, GrpcRequestInternalError) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(buffer, frames); + std::ignore = decoder.decode(buffer, frames); EXPECT_EQ(1, frames.size()); EXPECT_EQ(12, frames[0].length_); @@ -722,7 +722,7 @@ TEST_F(ReverseBridgeTest, FilterConfigPerRouteEnabled) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(buffer, frames); + std::ignore = decoder.decode(buffer, frames); EXPECT_EQ(1, frames.size()); EXPECT_EQ(12, frames[0].length_); @@ -799,7 +799,7 @@ TEST_F(ReverseBridgeTest, RouteWithTrailers) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(buffer, frames); + std::ignore = decoder.decode(buffer, frames); EXPECT_EQ(4, trailers.size()); EXPECT_EQ(1, frames.size()); diff --git a/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc b/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc index 56515243009a..637e71e3ac3e 100644 --- a/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc +++ b/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc @@ -105,7 +105,7 @@ name: grpc_json_transcoder if (!expected_grpc_request_messages.empty()) { Grpc::Decoder grpc_decoder; std::vector frames; - ASSERT_TRUE(grpc_decoder.decode(upstream_request_->body(), frames)) << dump; + ASSERT_TRUE(grpc_decoder.decode(upstream_request_->body(), frames).ok()) << dump; EXPECT_EQ(expected_grpc_request_messages.size(), frames.size()); for (size_t i = 0; i < expected_grpc_request_messages.size(); ++i) { diff --git a/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc b/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc index 4b940cdac87a..0c28bee553a3 100644 --- a/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc +++ b/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc @@ -570,7 +570,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPost) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(request_data, frames); + std::ignore = decoder.decode(request_data, frames); EXPECT_EQ(1, frames.size()); @@ -642,7 +642,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPostWithPackageServiceMetho Grpc::Decoder decoder; std::vector frames; - decoder.decode(request_data, frames); + std::ignore = decoder.decode(request_data, frames); EXPECT_EQ(1, frames.size()); @@ -704,7 +704,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, ForwardUnaryPostGrpc) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(*request_data, frames); + std::ignore = decoder.decode(*request_data, frames); EXPECT_EQ(1, frames.size()); @@ -737,7 +737,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, ForwardUnaryPostGrpc) { EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.encodeData(*response_data, true)); frames.clear(); - decoder.decode(*response_data, frames); + std::ignore = decoder.decode(*response_data, frames); EXPECT_EQ(1, frames.size()); @@ -1089,7 +1089,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPostWithHttpBody) { // decodeData with EOS will output the grpc frame. std::vector frames; Grpc::Decoder decoder; - decoder.decode(buffer, frames); + std::ignore = decoder.decode(buffer, frames); ASSERT_EQ(frames.size(), 1); bookstore::EchoBodyRequest expected_request; @@ -1197,7 +1197,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingStreamPostWithHttpBody) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(buffer, frames); + std::ignore = decoder.decode(buffer, frames); EXPECT_EQ(frames.size(), 1); bookstore::EchoBodyRequest expected_request; @@ -1828,7 +1828,7 @@ TEST_P(GrpcJsonTranscoderFilterUnescapeTest, UnescapeSpec) { Grpc::Decoder decoder; std::vector frames; - decoder.decode(request_data, frames); + std::ignore = decoder.decode(request_data, frames); EXPECT_EQ(1, frames.size()); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 78bc9b1566aa..c2bb672eadae 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -198,7 +198,7 @@ class FakeStream : public Http::RequestDecoder, { absl::MutexLock lock(&lock_); last_body_size = body_.length(); - if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) { + if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) { return testing::AssertionFailure() << "Couldn't decode gRPC data frame: " << body_.toString(); } @@ -210,7 +210,7 @@ class FakeStream : public Http::RequestDecoder, } { absl::MutexLock lock(&lock_); - if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) { + if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) { return testing::AssertionFailure() << "Couldn't decode gRPC data frame: " << body_.toString(); }