diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 09ff8f1424c4..e2d58c36b85b 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -48,6 +48,9 @@ Version history local configuration. * http: added the ability to pass DNS type Subject Alternative Names of the client certificate in the :ref:`config_http_conn_man_headers_x-forwarded-client-cert` header. +* http: local responses to gRPC requests are now sent as trailers-only gRPC responses instead of plain HTTP responses. + Notably the HTTP response code is always "200" in this case, and the gRPC error code is carried in "grpc-status" + header, optionally accompanied with a text message in "grpc-message" header. * listeners: added :ref:`tcp_fast_open_queue_length ` option. * load balancing: added :ref:`weighted round robin ` support. The round robin diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 213e5b359b97..2c3825c0b1c7 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -190,6 +190,21 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { */ virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE; + /** + * Create a locally generated response using the provided response_code and body_text parameters. + * If the request was a gRPC request the local reply will be encoded as a gRPC response with a 200 + * HTTP response code and grpc-status and grpc-message headers mapped from the provided + * parameters. + * + * @param response_code supplies the HTTP response code. + * @param body_text supplies the optional body text which is sent using the text/plain content + * type, or encoded in the grpc-message header. + * @param modify_headers supplies an optional callback function that can modify the + * response headers. + */ + virtual void sendLocalReply(Code response_code, const std::string& body_text, + std::function modify_headers) PURE; + /** * Called with 100-Continue headers to be encoded. * diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index de6cc5264fd1..956ac2552c25 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -46,13 +46,21 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "status_lib", + srcs = ["status.cc"], + hdrs = ["status.h"], + deps = [ + "//include/envoy/grpc:status", + ], +) + envoy_cc_library( name = "common_lib", srcs = ["common.cc"], hdrs = ["common.h"], external_deps = ["abseil_optional"], deps = [ - "//include/envoy/grpc:status", "//include/envoy/http:header_map_interface", "//include/envoy/http:message_interface", "//include/envoy/stats:stats_interface", @@ -64,6 +72,7 @@ envoy_cc_library( "//source/common/common:enum_to_int", "//source/common/common:macros", "//source/common/common:utility_lib", + "//source/common/grpc:status_lib", "//source/common/http:filter_utility_lib", "//source/common/http:headers_lib", "//source/common/http:message_lib", diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 79b6d92b82bb..7c54c37c3e2b 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -106,7 +106,7 @@ void AsyncStreamImpl::onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) { } // Technically this should be // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md - // as given by Common::httpToGrpcStatus(), but the Google gRPC client treats + // as given by Grpc::Utility::httpToGrpcStatus(), but the Google gRPC client treats // this as GrpcStatus::Canceled. streamError(Status::GrpcStatus::Canceled); return; diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 6f7714cc5156..b3f36d52d0aa 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -124,88 +124,6 @@ bool Common::resolveServiceAndMethod(const Http::HeaderEntry* path, std::string* return true; } -Status::GrpcStatus Common::httpToGrpcStatus(uint64_t http_response_status) { - // From - // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. - switch (http_response_status) { - case 400: - return Status::GrpcStatus::Internal; - case 401: - return Status::GrpcStatus::Unauthenticated; - case 403: - return Status::GrpcStatus::PermissionDenied; - case 404: - return Status::GrpcStatus::Unimplemented; - case 429: - case 502: - case 503: - case 504: - return Status::GrpcStatus::Unavailable; - default: - return Status::GrpcStatus::Unknown; - } -} - -uint64_t Common::grpcToHttpStatus(Status::GrpcStatus grpc_status) { - // From https://cloud.google.com/apis/design/errors#handling_errors. - switch (grpc_status) { - case Status::GrpcStatus::Ok: - return 200; - case Status::GrpcStatus::Canceled: - // Client closed request. - return 499; - case Status::GrpcStatus::Unknown: - // Internal server error. - return 500; - case Status::GrpcStatus::InvalidArgument: - // Bad request. - return 400; - case Status::GrpcStatus::DeadlineExceeded: - // Gateway Time-out. - return 504; - case Status::GrpcStatus::NotFound: - // Not found. - return 404; - case Status::GrpcStatus::AlreadyExists: - // Conflict. - return 409; - case Status::GrpcStatus::PermissionDenied: - // Forbidden. - return 403; - case Status::GrpcStatus::ResourceExhausted: - // Too many requests. - return 429; - case Status::GrpcStatus::FailedPrecondition: - // Bad request. - return 400; - case Status::GrpcStatus::Aborted: - // Conflict. - return 409; - case Status::GrpcStatus::OutOfRange: - // Bad request. - return 400; - case Status::GrpcStatus::Unimplemented: - // Not implemented. - return 501; - case Status::GrpcStatus::Internal: - // Internal server error. - return 500; - case Status::GrpcStatus::Unavailable: - // Service unavailable. - return 503; - case Status::GrpcStatus::DataLoss: - // Internal server error. - return 500; - case Status::GrpcStatus::Unauthenticated: - // Unauthorized. - return 401; - case Status::GrpcStatus::InvalidCode: - default: - // Internal server error. - return 500; - } -} - Buffer::InstancePtr Common::serializeBody(const Protobuf::Message& message) { // http://www.grpc.io/docs/guides/wire.html // Reserve enough space for the entire message and the 5 byte header. diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 910018bcfd53..5e66dd601d8a 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -10,6 +10,7 @@ #include "envoy/http/message.h" #include "envoy/stats/stats.h" +#include "common/grpc/status.h" #include "common/protobuf/protobuf.h" #include "absl/types/optional.h" @@ -56,21 +57,6 @@ class Common { */ static std::string getGrpcMessage(const Http::HeaderMap& trailers); - /** - * Returns the gRPC status code from a given HTTP response status code. Ordinarily, it is expected - * that a 200 response is provided, but gRPC defines a mapping for intermediaries that are not - * gRPC aware, see https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. - * @param http_response_status HTTP status code. - * @return Status::GrpcStatus corresponding gRPC status code. - */ - static Status::GrpcStatus httpToGrpcStatus(uint64_t http_response_status); - - /** - * @param grpc_status gRPC status from grpc-status header. - * @return uint64_t the canonical HTTP status code corresponding to a gRPC status code. - */ - static uint64_t grpcToHttpStatus(Status::GrpcStatus grpc_status); - /** * Charge a success/failure stat to a cluster/service/method. * @param cluster supplies the target cluster. diff --git a/source/common/grpc/status.cc b/source/common/grpc/status.cc new file mode 100644 index 000000000000..70fdcce02711 --- /dev/null +++ b/source/common/grpc/status.cc @@ -0,0 +1,89 @@ +#include "common/grpc/status.h" + +namespace Envoy { +namespace Grpc { + +Status::GrpcStatus Utility::httpToGrpcStatus(uint64_t http_response_status) { + // From + // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + switch (http_response_status) { + case 400: + return Status::GrpcStatus::Internal; + case 401: + return Status::GrpcStatus::Unauthenticated; + case 403: + return Status::GrpcStatus::PermissionDenied; + case 404: + return Status::GrpcStatus::Unimplemented; + case 429: + case 502: + case 503: + case 504: + return Status::GrpcStatus::Unavailable; + default: + return Status::GrpcStatus::Unknown; + } +} + +uint64_t Utility::grpcToHttpStatus(Status::GrpcStatus grpc_status) { + // From https://cloud.google.com/apis/design/errors#handling_errors. + switch (grpc_status) { + case Status::GrpcStatus::Ok: + return 200; + case Status::GrpcStatus::Canceled: + // Client closed request. + return 499; + case Status::GrpcStatus::Unknown: + // Internal server error. + return 500; + case Status::GrpcStatus::InvalidArgument: + // Bad request. + return 400; + case Status::GrpcStatus::DeadlineExceeded: + // Gateway Time-out. + return 504; + case Status::GrpcStatus::NotFound: + // Not found. + return 404; + case Status::GrpcStatus::AlreadyExists: + // Conflict. + return 409; + case Status::GrpcStatus::PermissionDenied: + // Forbidden. + return 403; + case Status::GrpcStatus::ResourceExhausted: + // Too many requests. + return 429; + case Status::GrpcStatus::FailedPrecondition: + // Bad request. + return 400; + case Status::GrpcStatus::Aborted: + // Conflict. + return 409; + case Status::GrpcStatus::OutOfRange: + // Bad request. + return 400; + case Status::GrpcStatus::Unimplemented: + // Not implemented. + return 501; + case Status::GrpcStatus::Internal: + // Internal server error. + return 500; + case Status::GrpcStatus::Unavailable: + // Service unavailable. + return 503; + case Status::GrpcStatus::DataLoss: + // Internal server error. + return 500; + case Status::GrpcStatus::Unauthenticated: + // Unauthorized. + return 401; + case Status::GrpcStatus::InvalidCode: + default: + // Internal server error. + return 500; + } +} + +} // namespace Grpc +} // namespace Envoy diff --git a/source/common/grpc/status.h b/source/common/grpc/status.h new file mode 100644 index 000000000000..6355e002c2a9 --- /dev/null +++ b/source/common/grpc/status.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include "envoy/grpc/status.h" + +namespace Envoy { +namespace Grpc { + +/** + * Grpc::Status utilities. + */ +class Utility { +public: + /** + * Returns the gRPC status code from a given HTTP response status code. Ordinarily, it is expected + * that a 200 response is provided, but gRPC defines a mapping for intermediaries that are not + * gRPC aware, see https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + * @param http_response_status HTTP status code. + * @return Status::GrpcStatus corresponding gRPC status code. + */ + static Status::GrpcStatus httpToGrpcStatus(uint64_t http_response_status); + + /** + * @param grpc_status gRPC status from grpc-status header. + * @return uint64_t the canonical HTTP status code corresponding to a gRPC status code. + */ + static uint64_t grpcToHttpStatus(Status::GrpcStatus grpc_status); +}; + +} // namespace Grpc +} // namespace Envoy diff --git a/source/common/http/BUILD b/source/common/http/BUILD index 09f1a2a4f959..a52b06f93b50 100644 --- a/source/common/http/BUILD +++ b/source/common/http/BUILD @@ -247,6 +247,7 @@ envoy_cc_library( "//source/common/common:empty_string", "//source/common/common:enum_to_int", "//source/common/common:utility_lib", + "//source/common/grpc:status_lib", "//source/common/json:json_loader_lib", "//source/common/network:utility_lib", "//source/common/protobuf:utility_lib", diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 4171785e056b..19ccbb18cc10 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -6,6 +6,7 @@ #include #include +#include "common/grpc/common.h" #include "common/http/utility.h" namespace Envoy { @@ -110,6 +111,7 @@ void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) { } void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) { + is_grpc_request_ = Grpc::Common::hasGrpcContentType(headers); headers.insertEnvoyInternalRequest().value().setReference( Headers::get().EnvoyInternalRequestValues.True); Utility::appendXff(headers, *parent_.config_.local_info_.address()); diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index a01e4061e12f..1b7967adf7ea 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -261,6 +261,19 @@ class AsyncStreamImpl : public AsyncClient::Stream, void continueDecoding() override { NOT_IMPLEMENTED; } void addDecodedData(Buffer::Instance&, bool) override { NOT_IMPLEMENTED; } const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); } + void sendLocalReply(Code code, const std::string& body, + std::function modify_headers) override { + Utility::sendLocalReply( + is_grpc_request_, + [this, modify_headers](HeaderMapPtr&& headers, bool end_stream) -> void { + if (modify_headers != nullptr) { + modify_headers(*headers); + } + encodeHeaders(std::move(headers), end_stream); + }, + [this](Buffer::Instance& data, bool end_stream) -> void { encodeData(data, end_stream); }, + remote_closed_, code, body); + } // The async client won't pause if sending an Expect: 100-Continue so simply // swallows any incoming encode100Continue. void encode100ContinueHeaders(HeaderMapPtr&&) override {} @@ -284,6 +297,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, bool local_closed_{}; bool remote_closed_{}; Buffer::InstancePtr buffered_body_; + bool is_grpc_request_{}; friend class AsyncClientImpl; }; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 9766ef269f6a..c58dd891a789 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -473,10 +473,8 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, // The protocol may have shifted in the HTTP/1.0 case so reset it. request_info_.protocol(protocol); if (!connection_manager_.config_.http1Settings().accept_http_10_) { - // Send "Upgrade Required" if HTTP/1.0 support is not expliictly configured on. - HeaderMapImpl headers{ - {Headers::get().Status, std::to_string(enumToInt(Code::UpgradeRequired))}}; - encodeHeaders(nullptr, headers, true); + // Send "Upgrade Required" if HTTP/1.0 support is not explictly configured on. + sendLocalReply(false, Code::UpgradeRequired, "", nullptr); return; } else { // HTTP/1.0 defaults to single-use connections. Make sure the connection @@ -499,8 +497,8 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, connection_manager_.config_.http1Settings().default_host_for_http_10_); } else { // Require host header. For HTTP/1.1 Host has already been translated to :authority. - HeaderMapImpl headers{{Headers::get().Status, std::to_string(enumToInt(Code::BadRequest))}}; - encodeHeaders(nullptr, headers, true); + sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::BadRequest, "", + nullptr); return; } } @@ -513,9 +511,8 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, // header size http_parser and nghttp2 will allow, down to 16k or 8k for // envoy users who do not wish to proxy large headers. if (request_headers_->byteSize() > (60 * 1024)) { - HeaderMapImpl headers{ - {Headers::get().Status, std::to_string(enumToInt(Code::RequestHeaderFieldsTooLarge))}}; - encodeHeaders(nullptr, headers, true); + sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), + Code::RequestHeaderFieldsTooLarge, "", nullptr); return; } @@ -526,8 +523,8 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, // don't support that currently. if (!request_headers_->Path() || request_headers_->Path()->value().c_str()[0] != '/') { connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc(); - HeaderMapImpl headers{{Headers::get().Status, std::to_string(enumToInt(Code::NotFound))}}; - encodeHeaders(nullptr, headers, true); + sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::NotFound, "", + nullptr); return; } @@ -570,8 +567,8 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, } else if (websocket_requested) { // Do not allow WebSocket upgrades if the route does not support it. connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc(); - HeaderMapImpl headers{{Headers::get().Status, std::to_string(enumToInt(Code::Forbidden))}}; - encodeHeaders(nullptr, headers, true); + sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::Forbidden, "", + nullptr); return; } // Allow non websocket requests to go through websocket enabled routes. @@ -653,7 +650,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte for (; entry != decoder_filters_.end(); entry++) { ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders)); state_.filter_call_state_ |= FilterCallState::DecodeHeaders; - FilterHeadersStatus status = (*entry)->handle_->decodeHeaders( + FilterHeadersStatus status = (*entry)->decodeHeaders( headers, end_stream && continue_data_entry == decoder_filters_.end()); state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders; ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this, @@ -821,6 +818,27 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { cached_route_ = std::move(route); } +void ConnectionManagerImpl::ActiveStream::sendLocalReply( + bool is_grpc_request, Code code, const std::string& body, + std::function modify_headers) { + Utility::sendLocalReply(is_grpc_request, + [this, modify_headers](HeaderMapPtr&& headers, bool end_stream) -> void { + if (modify_headers != nullptr) { + modify_headers(*headers); + } + response_headers_ = std::move(headers); + // TODO: Start encoding from the last decoder filter that saw the + // request instead. + encodeHeaders(nullptr, *response_headers_, end_stream); + }, + [this](Buffer::Instance& data, bool end_stream) -> void { + // TODO: Start encoding from the last decoder filter that saw the + // request instead. + encodeData(nullptr, data, end_stream); + }, + state_.destroyed_, code, body); +} + void ConnectionManagerImpl::ActiveStream::encode100ContinueHeaders( ActiveStreamEncoderFilter* filter, HeaderMap& headers) { ASSERT(connection_manager_.config_.proxy100Continue()); @@ -1316,8 +1334,7 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::requestDataTooLarge() { onDecoderFilterAboveWriteBufferHighWatermark(); } else { parent_.connection_manager_.stats_.named_.downstream_rq_too_large_.inc(); - Http::Utility::sendLocalReply(*this, parent_.state_.destroyed_, Http::Code::PayloadTooLarge, - CodeUtility::toString(Http::Code::PayloadTooLarge)); + sendLocalReply(Code::PayloadTooLarge, CodeUtility::toString(Code::PayloadTooLarge), nullptr); } } @@ -1389,18 +1406,20 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter::responseDataTooLarge() { parent_.state_.encoder_filters_streaming_ = true; stopped_ = false; - Http::Utility::sendLocalReply( - [&](HeaderMapPtr&& response_headers, bool end_stream) -> void { - parent_.response_headers_ = std::move(response_headers); - parent_.response_encoder_->encodeHeaders(*parent_.response_headers_, end_stream); - }, - [&](Buffer::Instance& data, bool end_stream) -> void { - parent_.response_encoder_->encodeData(data, end_stream); - parent_.state_.local_complete_ = end_stream; - parent_.maybeEndEncode(end_stream); - }, - parent_.state_.destroyed_, Http::Code::InternalServerError, - CodeUtility::toString(Http::Code::InternalServerError)); + Http::Utility::sendLocalReply(Grpc::Common::hasGrpcContentType(*parent_.request_headers_), + [&](HeaderMapPtr&& response_headers, bool end_stream) -> void { + parent_.response_headers_ = std::move(response_headers); + parent_.response_encoder_->encodeHeaders( + *parent_.response_headers_, end_stream); + parent_.state_.local_complete_ = end_stream; + }, + [&](Buffer::Instance& data, bool end_stream) -> void { + parent_.response_encoder_->encodeData(data, end_stream); + parent_.state_.local_complete_ = end_stream; + }, + parent_.state_.destroyed_, Http::Code::InternalServerError, + CodeUtility::toString(Http::Code::InternalServerError)); + parent_.maybeEndEncode(parent_.state_.local_complete_); } else { resetStream(); } diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 3062720905d6..9cefbfd254ef 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -25,8 +25,10 @@ #include "common/buffer/watermark_buffer.h" #include "common/common/linked_object.h" +#include "common/grpc/common.h" #include "common/http/conn_manager_config.h" #include "common/http/user_agent.h" +#include "common/http/utility.h" #include "common/request_info/request_info_impl.h" #include "common/tracing/http_tracer_impl.h" @@ -164,6 +166,10 @@ class ConnectionManagerImpl : Logger::Loggable, const Buffer::Instance* decodingBuffer() override { return parent_.buffered_request_data_.get(); } + void sendLocalReply(Code code, const std::string& body, + std::function modify_headers) override { + parent_.sendLocalReply(is_grpc_request_, code, body, modify_headers); + } void encode100ContinueHeaders(HeaderMapPtr&& headers) override; void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override; void encodeData(Buffer::Instance& data, bool end_stream) override; @@ -177,10 +183,19 @@ class ConnectionManagerImpl : Logger::Loggable, void setDecoderBufferLimit(uint32_t limit) override { parent_.setBufferLimit(limit); } uint32_t decoderBufferLimit() override { return parent_.buffer_limit_; } + // Each decoder filter instance checks if the request passed to the filter is gRPC + // so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders() + // called here may change the content type, so we must check it before the call. + FilterHeadersStatus decodeHeaders(HeaderMap& headers, bool end_stream) { + is_grpc_request_ = Grpc::Common::hasGrpcContentType(headers); + return handle_->decodeHeaders(headers, end_stream); + } + void requestDataTooLarge(); void requestDataDrained(); StreamDecoderFilterSharedPtr handle_; + bool is_grpc_request_{}; }; typedef std::unique_ptr ActiveStreamDecoderFilterPtr; @@ -257,6 +272,8 @@ class ConnectionManagerImpl : Logger::Loggable, void decodeTrailers(ActiveStreamDecoderFilter* filter, HeaderMap& trailers); void maybeEndDecode(bool end_stream); void addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data, bool streaming); + void sendLocalReply(bool is_grpc_request, Code code, const std::string& body, + std::function modify_headers); void encode100ContinueHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers); void encodeHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers, bool end_stream); void encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data, bool end_stream); diff --git a/source/common/http/utility.cc b/source/common/http/utility.cc index 3c159aa79dbd..c4dae19e254c 100644 --- a/source/common/http/utility.cc +++ b/source/common/http/utility.cc @@ -12,6 +12,7 @@ #include "common/common/enum_to_int.h" #include "common/common/fmt.h" #include "common/common/utility.h" +#include "common/grpc/status.h" #include "common/http/exception.h" #include "common/http/header_map_impl.h" #include "common/http/headers.h" @@ -210,30 +211,48 @@ Utility::parseHttp1Settings(const envoy::api::v2::core::Http1ProtocolOptions& co return ret; } -void Utility::sendLocalReply(StreamDecoderFilterCallbacks& callbacks, const bool& is_reset, - Code response_code, const std::string& body_text) { - sendLocalReply( - [&](HeaderMapPtr&& headers, bool end_stream) -> void { - callbacks.encodeHeaders(std::move(headers), end_stream); - }, - [&](Buffer::Instance& data, bool end_stream) -> void { - callbacks.encodeData(data, end_stream); - }, - is_reset, response_code, body_text); +void Utility::sendLocalReply(bool is_grpc, StreamDecoderFilterCallbacks& callbacks, + const bool& is_reset, Code response_code, + const std::string& body_text) { + sendLocalReply(is_grpc, + [&](HeaderMapPtr&& headers, bool end_stream) -> void { + callbacks.encodeHeaders(std::move(headers), end_stream); + }, + [&](Buffer::Instance& data, bool end_stream) -> void { + callbacks.encodeData(data, end_stream); + }, + is_reset, response_code, body_text); } void Utility::sendLocalReply( - std::function encode_headers, + bool is_grpc, std::function encode_headers, std::function encode_data, const bool& is_reset, Code response_code, const std::string& body_text) { + // encode_headers() may reset the stream, so the stream must not be reset before calling it. + ASSERT(!is_reset); + // Respond with a gRPC trailers-only response if the request is gRPC + if (is_grpc) { + HeaderMapPtr response_headers{new HeaderMapImpl{ + {Headers::get().Status, std::to_string(enumToInt(Code::OK))}, + {Headers::get().ContentType, Headers::get().ContentTypeValues.Grpc}, + {Headers::get().GrpcStatus, + std::to_string(enumToInt(Grpc::Utility::httpToGrpcStatus(enumToInt(response_code))))}}}; + if (!body_text.empty()) { + // TODO: GrpcMessage should be percent-encoded + response_headers->insertGrpcMessage().value(body_text); + } + encode_headers(std::move(response_headers), true); // Trailers only response + return; + } + HeaderMapPtr response_headers{ new HeaderMapImpl{{Headers::get().Status, std::to_string(enumToInt(response_code))}}}; if (!body_text.empty()) { response_headers->insertContentLength().value(body_text.size()); response_headers->insertContentType().value(Headers::get().ContentTypeValues.Text); } - encode_headers(std::move(response_headers), body_text.empty()); + // encode_headers()) may have changed the referenced is_reset so we need to test it if (!body_text.empty() && !is_reset) { Buffer::OwnedImpl buffer(body_text); encode_data(buffer, true); diff --git a/source/common/http/utility.h b/source/common/http/utility.h index 1e1e5fc54ed6..ca6802b9af49 100644 --- a/source/common/http/utility.h +++ b/source/common/http/utility.h @@ -5,6 +5,7 @@ #include #include "envoy/api/v2/core/protocol.pb.h" +#include "envoy/grpc/status.h" #include "envoy/http/codes.h" #include "envoy/http/filter.h" @@ -104,6 +105,7 @@ class Utility { /** * Create a locally generated response using filter callbacks. + * @param is_grpc tells if this is a response to a gRPC request. * @param callbacks supplies the filter callbacks to use. * @param is_reset boolean reference that indicates whether a stream has been reset. It is the * responsibility of the caller to ensure that this is set to false if onDestroy() @@ -112,10 +114,13 @@ class Utility { * @param body_text supplies the optional body text which is sent using the text/plain content * type. */ - static void sendLocalReply(StreamDecoderFilterCallbacks& callbacks, const bool& is_reset, - Code response_code, const std::string& body_text); + static void sendLocalReply(bool is_grpc, StreamDecoderFilterCallbacks& callbacks, + const bool& is_reset, Code response_code, + const std::string& body_text); + /** * Create a locally generated response using the provided lambdas. + * @param is_grpc tells if this is a response to a gRPC request. * @param encode_headers supplies the function to encode response headers. * @param encode_data supplies the function to encode the response body. * @param is_reset boolean reference that indicates whether a stream has been reset. It is the @@ -126,7 +131,8 @@ class Utility { * type. */ static void - sendLocalReply(std::function encode_headers, + sendLocalReply(bool is_grpc, + std::function encode_headers, std::function encode_data, const bool& is_reset, Code response_code, const std::string& body_text); diff --git a/source/common/router/router.cc b/source/common/router/router.cc index b2d577f7557c..9e4857c09292 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -172,23 +172,6 @@ void Filter::chargeUpstreamCode(Http::Code code, chargeUpstreamCode(response_status_code, fake_response_headers, upstream_host, dropped); } -void Filter::sendLocalReply(Http::Code code, const std::string& body, - std::function modify_headers) { - // This is a customized version of send local reply that allows us to set the overloaded - // header. - Http::Utility::sendLocalReply( - [this, modify_headers](Http::HeaderMapPtr&& headers, bool end_stream) -> void { - if (headers != nullptr && modify_headers != nullptr) { - modify_headers(*headers); - } - callbacks_->encodeHeaders(std::move(headers), end_stream); - }, - [this](Buffer::Instance& data, bool end_stream) -> void { - callbacks_->encodeData(data, end_stream); - }, - stream_destroyed_, code, body); -} - Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool end_stream) { // Do a common header check. We make sure that all outgoing requests have all HTTP/2 headers. // These get stripped by HTTP/1 codec where applicable. @@ -198,6 +181,9 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e downstream_headers_ = &headers; + // TODO: Maybe add a filter API for this. + grpc_request_ = Grpc::Common::hasGrpcContentType(headers); + // Only increment rq total stat if we actually decode headers here. This does not count requests // that get handled by earlier filters. config_.stats_.rq_total_.inc(); @@ -210,9 +196,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e headers.Path()->value().c_str()); callbacks_->requestInfo().setResponseFlag(RequestInfo::ResponseFlag::NoRouteFound); - Http::HeaderMapPtr response_headers{new Http::HeaderMapImpl{ - {Http::Headers::get().Status, std::to_string(enumToInt(Http::Code::NotFound))}}}; - callbacks_->encodeHeaders(std::move(response_headers), true); + callbacks_->sendLocalReply(Http::Code::NotFound, "", nullptr); return Http::FilterHeadersStatus::StopIteration; } @@ -221,7 +205,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e if (direct_response != nullptr) { config_.stats_.rq_direct_response_.inc(); direct_response->rewritePathHeader(headers); - sendLocalReply( + callbacks_->sendLocalReply( direct_response->responseCode(), direct_response->responseBody(), [ this, direct_response, &request_headers = headers ](Http::HeaderMap & response_headers) ->void { @@ -242,10 +226,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName()); callbacks_->requestInfo().setResponseFlag(RequestInfo::ResponseFlag::NoRouteFound); - Http::HeaderMapPtr response_headers{new Http::HeaderMapImpl{ - {Http::Headers::get().Status, - std::to_string(enumToInt(route_entry_->clusterNotFoundResponseCode()))}}}; - callbacks_->encodeHeaders(std::move(response_headers), true); + callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", nullptr); return Http::FilterHeadersStatus::StopIteration; } cluster_ = cluster->info(); @@ -265,7 +246,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e if (cluster_->maintenanceMode()) { callbacks_->requestInfo().setResponseFlag(RequestInfo::ResponseFlag::UpstreamOverflow); chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true); - sendLocalReply( + callbacks_->sendLocalReply( Http::Code::ServiceUnavailable, "maintenance mode", [](Http::HeaderMap& headers) { headers.insertEnvoyOverloaded().value(Http::Headers::get().EnvoyOverloadedValues.True); }); @@ -302,7 +283,6 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers); - grpc_request_ = Grpc::Common::hasGrpcContentType(headers); upstream_request_.reset(new UpstreamRequest(*this, *conn_pool)); upstream_request_->encodeHeaders(end_stream); if (end_stream) { @@ -331,7 +311,7 @@ Http::ConnectionPool::Instance* Filter::getConnPool() { void Filter::sendNoHealthyUpstreamResponse() { callbacks_->requestInfo().setResponseFlag(RequestInfo::ResponseFlag::NoHealthyUpstream); chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, false); - sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream"); + callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", nullptr); } Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) { @@ -518,7 +498,7 @@ void Filter::onUpstreamReset(UpstreamResetType type, if (upstream_host != nullptr && !Http::CodeUtility::is5xx(enumToInt(code))) { upstream_host->stats().rq_error_.inc(); } - sendLocalReply(code, body, [dropped](Http::HeaderMap& headers) { + callbacks_->sendLocalReply(code, body, [dropped](Http::HeaderMap& headers) { if (dropped) { headers.insertEnvoyOverloaded().value(Http::Headers::get().EnvoyOverloadedValues.True); } @@ -553,7 +533,7 @@ void Filter::handleNon5xxResponseHeaders(const Http::HeaderMap& headers, bool en if (end_stream) { absl::optional grpc_status = Grpc::Common::getGrpcStatus(headers); if (grpc_status && - !Http::CodeUtility::is5xx(Grpc::Common::grpcToHttpStatus(grpc_status.value()))) { + !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) { upstream_request_->upstream_host_->stats().rq_success_.inc(); } else { upstream_request_->upstream_host_->stats().rq_error_.inc(); @@ -660,7 +640,7 @@ void Filter::onUpstreamTrailers(Http::HeaderMapPtr&& trailers) { if (upstream_request_->grpc_rq_success_deferred_) { absl::optional grpc_status = Grpc::Common::getGrpcStatus(*trailers); if (grpc_status && - !Http::CodeUtility::is5xx(Grpc::Common::grpcToHttpStatus(grpc_status.value()))) { + !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) { upstream_request_->upstream_host_->stats().rq_success_.inc(); } else { upstream_request_->upstream_host_->stats().rq_error_.inc(); diff --git a/source/common/router/router.h b/source/common/router/router.h index 6c8b483fa1ff..38908681b16d 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -337,16 +337,6 @@ class Filter : Logger::Loggable, // and handle difference between gRPC and non-gRPC requests. void handleNon5xxResponseHeaders(const Http::HeaderMap& headers, bool end_stream); - /** - * Send a locally generated (non-proxied) HTTP response. - * @param code supplies the HTTP status code. - * @param body supplies the response body (empty string if no body is needed). - * @param modify_headers supplies an optional callback function that can modify the - * response headers. - */ - void sendLocalReply(Http::Code code, const std::string& body, - std::function modify_headers = nullptr); - FilterConfig& config_; Http::StreamDecoderFilterCallbacks* callbacks_{}; RouteConstSharedPtr route_; diff --git a/source/common/upstream/health_checker_impl.cc b/source/common/upstream/health_checker_impl.cc index 509fbcd2fb6d..024ed6cc22b1 100644 --- a/source/common/upstream/health_checker_impl.cc +++ b/source/common/upstream/health_checker_impl.cc @@ -362,7 +362,7 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeHeaders( return; } } - onRpcComplete(Grpc::Common::httpToGrpcStatus(http_response_status), "non-200 HTTP response", + onRpcComplete(Grpc::Utility::httpToGrpcStatus(http_response_status), "non-200 HTTP response", end_stream); return; } diff --git a/source/extensions/filters/http/buffer/buffer_filter.cc b/source/extensions/filters/http/buffer/buffer_filter.cc index ce712a026df6..36b45f31fd93 100644 --- a/source/extensions/filters/http/buffer/buffer_filter.cc +++ b/source/extensions/filters/http/buffer/buffer_filter.cc @@ -107,14 +107,10 @@ BufferFilterStats BufferFilter::generateStats(const std::string& prefix, Stats:: return {ALL_BUFFER_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; } -void BufferFilter::onDestroy() { - resetInternalState(); - stream_destroyed_ = true; -} +void BufferFilter::onDestroy() { resetInternalState(); } void BufferFilter::onRequestTimeout() { - Http::Utility::sendLocalReply(*callbacks_, stream_destroyed_, Http::Code::RequestTimeout, - "buffer request timeout"); + callbacks_->sendLocalReply(Http::Code::RequestTimeout, "buffer request timeout", nullptr); config_->stats().rq_timeout_.inc(); } diff --git a/source/extensions/filters/http/buffer/buffer_filter.h b/source/extensions/filters/http/buffer/buffer_filter.h index 277a0f50f9d0..75228fe19191 100644 --- a/source/extensions/filters/http/buffer/buffer_filter.h +++ b/source/extensions/filters/http/buffer/buffer_filter.h @@ -92,7 +92,6 @@ class BufferFilter : public Http::StreamDecoderFilter { const BufferFilterSettings* settings_; Http::StreamDecoderFilterCallbacks* callbacks_{}; Event::TimerPtr request_timeout_; - bool stream_destroyed_{}; bool config_initialized_{}; }; diff --git a/source/extensions/filters/http/ext_authz/ext_authz.cc b/source/extensions/filters/http/ext_authz/ext_authz.cc index 70a1156569e5..b8d4a5c27a85 100644 --- a/source/extensions/filters/http/ext_authz/ext_authz.cc +++ b/source/extensions/filters/http/ext_authz/ext_authz.cc @@ -19,16 +19,6 @@ namespace Extensions { namespace HttpFilters { namespace ExtAuthz { -namespace { - -const Http::HeaderMap* getDeniedHeader() { - static const Http::HeaderMap* header_map = new Http::HeaderMapImpl{ - {Http::Headers::get().Status, std::to_string(enumToInt(Http::Code::Forbidden))}}; - return header_map; -} - -} // namespace - void Filter::initiateCall(const Http::HeaderMap& headers) { Router::RouteConstSharedPtr route = callbacks_->route(); if (route == nullptr || route->routeEntry() == nullptr) { @@ -112,8 +102,7 @@ void Filter::onComplete(Filters::Common::ExtAuthz::CheckStatus status) { // if there is an error contacting the service. if (status == CheckStatus::Denied || (status == CheckStatus::Error && !config_->failureModeAllow())) { - Http::HeaderMapPtr response_headers{new Http::HeaderMapImpl(*getDeniedHeader())}; - callbacks_->encodeHeaders(std::move(response_headers), true); + callbacks_->sendLocalReply(Http::Code::Forbidden, "", nullptr); callbacks_->requestInfo().setResponseFlag( RequestInfo::ResponseFlag::UnauthorizedExternalService); } else { diff --git a/source/extensions/filters/http/fault/fault_filter.cc b/source/extensions/filters/http/fault/fault_filter.cc index cf646324adba..9669b8b041d7 100644 --- a/source/extensions/filters/http/fault/fault_filter.cc +++ b/source/extensions/filters/http/fault/fault_filter.cc @@ -231,10 +231,7 @@ FaultFilterStats FaultFilterConfig::generateStats(const std::string& prefix, Sta return {ALL_FAULT_FILTER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix))}; } -void FaultFilter::onDestroy() { - resetTimerState(); - stream_destroyed_ = true; -} +void FaultFilter::onDestroy() { resetTimerState(); } void FaultFilter::postDelayInjection() { resetTimerState(); @@ -250,8 +247,8 @@ void FaultFilter::postDelayInjection() { void FaultFilter::abortWithHTTPStatus() { callbacks_->requestInfo().setResponseFlag(RequestInfo::ResponseFlag::FaultInjected); - Http::Utility::sendLocalReply(*callbacks_, stream_destroyed_, - static_cast(abortHttpStatus()), "fault filter abort"); + callbacks_->sendLocalReply(static_cast(abortHttpStatus()), "fault filter abort", + nullptr); recordAbortsInjectedStats(); } diff --git a/source/extensions/filters/http/fault/fault_filter.h b/source/extensions/filters/http/fault/fault_filter.h index 8c4336bf4362..7559a3003837 100644 --- a/source/extensions/filters/http/fault/fault_filter.h +++ b/source/extensions/filters/http/fault/fault_filter.h @@ -123,7 +123,6 @@ class FaultFilter : public Http::StreamDecoderFilter { Http::StreamDecoderFilterCallbacks* callbacks_{}; Event::TimerPtr delay_timer_; std::string downstream_cluster_{}; - bool stream_destroyed_{}; const FaultSettings* fault_settings_; std::string downstream_cluster_delay_percent_key_{}; diff --git a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc index 057234108dad..573d9eecb273 100644 --- a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc +++ b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.cc @@ -237,8 +237,8 @@ Http::FilterHeadersStatus JsonTranscoderFilter::decodeHeaders(Http::HeaderMap& h if (!request_status.ok()) { ENVOY_LOG(debug, "Transcoding request error {}", request_status.ToString()); error_ = true; - Http::Utility::sendLocalReply(*decoder_callbacks_, stream_reset_, Http::Code::BadRequest, - request_status.error_message()); + decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, request_status.error_message(), + nullptr); return Http::FilterHeadersStatus::StopIteration; } @@ -273,8 +273,8 @@ Http::FilterDataStatus JsonTranscoderFilter::decodeData(Buffer::Instance& data, if (!request_status.ok()) { ENVOY_LOG(debug, "Transcoding request error {}", request_status.ToString()); error_ = true; - Http::Utility::sendLocalReply(*decoder_callbacks_, stream_reset_, Http::Code::BadRequest, - request_status.error_message()); + decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, request_status.error_message(), + nullptr); return Http::FilterDataStatus::StopIterationNoBuffer; } @@ -377,7 +377,7 @@ Http::FilterTrailersStatus JsonTranscoderFilter::encodeTrailers(Http::HeaderMap& if (!grpc_status || grpc_status.value() == Grpc::Status::GrpcStatus::InvalidCode) { response_headers_->Status()->value(enumToInt(Http::Code::ServiceUnavailable)); } else { - response_headers_->Status()->value(Grpc::Common::grpcToHttpStatus(grpc_status.value())); + response_headers_->Status()->value(Grpc::Utility::grpcToHttpStatus(grpc_status.value())); response_headers_->insertGrpcStatus().value(enumToInt(grpc_status.value())); } diff --git a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.h b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.h index 1c70843ed304..e8544ea2600f 100644 --- a/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.h +++ b/source/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter.h @@ -113,7 +113,7 @@ class JsonTranscoderFilter : public Http::StreamFilter, public Logger::Loggable< void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override; // Http::StreamFilterBase - void onDestroy() override { stream_reset_ = true; } + void onDestroy() override {} private: bool readToBuffer(Protobuf::io::ZeroCopyInputStream& stream, Buffer::Instance& data); @@ -128,7 +128,6 @@ class JsonTranscoderFilter : public Http::StreamFilter, public Logger::Loggable< Http::HeaderMap* response_headers_{nullptr}; bool error_{false}; - bool stream_reset_{false}; }; } // namespace GrpcJsonTranscoder diff --git a/source/extensions/filters/http/grpc_web/grpc_web_filter.cc b/source/extensions/filters/http/grpc_web/grpc_web_filter.cc index 72c9a7ce1dfc..ffc0c0c20cfa 100644 --- a/source/extensions/filters/http/grpc_web/grpc_web_filter.cc +++ b/source/extensions/filters/http/grpc_web/grpc_web_filter.cc @@ -94,8 +94,8 @@ Http::FilterDataStatus GrpcWebFilter::decodeData(Buffer::Instance& data, bool en } if (available % 4 != 0) { // Client end stream with invalid base64. Note, base64 padding is mandatory. - Http::Utility::sendLocalReply(*decoder_callbacks_, stream_destroyed_, Http::Code::BadRequest, - "Bad gRPC-web request, invalid base64 data."); + decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, + "Bad gRPC-web request, invalid base64 data.", nullptr); return Http::FilterDataStatus::StopIterationNoBuffer; } } else if (available < 4) { @@ -110,8 +110,8 @@ Http::FilterDataStatus GrpcWebFilter::decodeData(Buffer::Instance& data, bool en decoding_buffer_.length())); if (decoded.empty()) { // Error happened when decoding base64. - Http::Utility::sendLocalReply(*decoder_callbacks_, stream_destroyed_, Http::Code::BadRequest, - "Bad gRPC-web request, invalid base64 data."); + decoder_callbacks_->sendLocalReply(Http::Code::BadRequest, + "Bad gRPC-web request, invalid base64 data.", nullptr); return Http::FilterDataStatus::StopIterationNoBuffer; } diff --git a/source/extensions/filters/http/grpc_web/grpc_web_filter.h b/source/extensions/filters/http/grpc_web/grpc_web_filter.h index 022c6ea853ca..f33cc8dca412 100644 --- a/source/extensions/filters/http/grpc_web/grpc_web_filter.h +++ b/source/extensions/filters/http/grpc_web/grpc_web_filter.h @@ -23,7 +23,7 @@ class GrpcWebFilter : public Http::StreamFilter, NonCopyable { virtual ~GrpcWebFilter(){}; // Http::StreamFilterBase - void onDestroy() override { stream_destroyed_ = true; }; + void onDestroy() override{}; // Implements StreamDecoderFilter. Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap&, bool) override; @@ -67,7 +67,6 @@ class GrpcWebFilter : public Http::StreamFilter, NonCopyable { std::string grpc_service_; std::string grpc_method_; bool do_stat_tracking_{}; - bool stream_destroyed_{}; bool is_grpc_web_request_{}; }; diff --git a/source/extensions/filters/http/health_check/health_check.cc b/source/extensions/filters/http/health_check/health_check.cc index e992280c2727..88fa02b1f692 100644 --- a/source/extensions/filters/http/health_check/health_check.cc +++ b/source/extensions/filters/http/health_check/health_check.cc @@ -89,13 +89,11 @@ Http::FilterHeadersStatus HealthCheckFilter::encodeHeaders(Http::HeaderMap& head void HealthCheckFilter::onComplete() { ASSERT(handling_); - Http::HeaderMapPtr headers; + Http::Code final_status = Http::Code::OK; if (context_.healthCheckFailed()) { callbacks_->requestInfo().setResponseFlag(RequestInfo::ResponseFlag::FailedLocalHealthCheck); - headers.reset(new Http::HeaderMapImpl{ - {Http::Headers::get().Status, std::to_string(enumToInt(Http::Code::ServiceUnavailable))}}); + final_status = Http::Code::ServiceUnavailable; } else { - Http::Code final_status = Http::Code::OK; if (cache_manager_) { final_status = cache_manager_->getCachedResponseCode(); } else if (cluster_min_healthy_percentages_ != nullptr && @@ -137,12 +135,9 @@ void HealthCheckFilter::onComplete() { if (!Http::CodeUtility::is2xx(enumToInt(final_status))) { callbacks_->requestInfo().setResponseFlag(RequestInfo::ResponseFlag::FailedLocalHealthCheck); } - - headers.reset(new Http::HeaderMapImpl{ - {Http::Headers::get().Status, std::to_string(enumToInt(final_status))}}); } - callbacks_->encodeHeaders(std::move(headers), true); + callbacks_->sendLocalReply(final_status, "", nullptr); } } // namespace HealthCheck diff --git a/source/extensions/filters/http/ratelimit/ratelimit.cc b/source/extensions/filters/http/ratelimit/ratelimit.cc index 67ae79116704..d63f0ca6cdd3 100644 --- a/source/extensions/filters/http/ratelimit/ratelimit.cc +++ b/source/extensions/filters/http/ratelimit/ratelimit.cc @@ -16,16 +16,6 @@ namespace Extensions { namespace HttpFilters { namespace RateLimitFilter { -namespace { - -static const Http::HeaderMap* getTooManyRequestsHeader() { - static const Http::HeaderMap* header_map = new Http::HeaderMapImpl{ - {Http::Headers::get().Status, std::to_string(enumToInt(Http::Code::TooManyRequests))}}; - return header_map; -} - -} // namespace - void Filter::initiateCall(const Http::HeaderMap& headers) { bool is_internal_request = headers.EnvoyInternalRequest() && (headers.EnvoyInternalRequest()->value() == "true"); @@ -133,8 +123,7 @@ void Filter::complete(RateLimit::LimitStatus status) { if (status == RateLimit::LimitStatus::OverLimit && config_->runtime().snapshot().featureEnabled("ratelimit.http_filter_enforcing", 100)) { state_ = State::Responded; - Http::HeaderMapPtr response_headers{new Http::HeaderMapImpl(*getTooManyRequestsHeader())}; - callbacks_->encodeHeaders(std::move(response_headers), true); + callbacks_->sendLocalReply(Http::Code::TooManyRequests, "", nullptr); callbacks_->requestInfo().setResponseFlag(RequestInfo::ResponseFlag::RateLimited); } else if (!initiating_call_) { callbacks_->continueDecoding(); diff --git a/test/common/grpc/common_test.cc b/test/common/grpc/common_test.cc index cd676270743d..c5231187033f 100644 --- a/test/common/grpc/common_test.cc +++ b/test/common/grpc/common_test.cc @@ -1,6 +1,7 @@ #include "common/grpc/common.h" #include "common/http/headers.h" #include "common/http/message_impl.h" +#include "common/http/utility.h" #include "test/mocks/upstream/mocks.h" #include "test/proto/helloworld.pb.h" @@ -121,7 +122,7 @@ TEST(GrpcCommonTest, GrpcToHttpStatus) { {Status::GrpcStatus::InvalidCode, 500}, }; for (const auto& test_case : test_set) { - EXPECT_EQ(test_case.second, Common::grpcToHttpStatus(test_case.first)); + EXPECT_EQ(test_case.second, Grpc::Utility::grpcToHttpStatus(test_case.first)); } } @@ -134,7 +135,7 @@ TEST(GrpcCommonTest, HttpToGrpcStatus) { {500, Status::GrpcStatus::Unknown}, }; for (const auto& test_case : test_set) { - EXPECT_EQ(test_case.second, Common::httpToGrpcStatus(test_case.first)); + EXPECT_EQ(test_case.second, Grpc::Utility::httpToGrpcStatus(test_case.first)); } } diff --git a/test/common/grpc/grpc_client_integration_test.cc b/test/common/grpc/grpc_client_integration_test.cc index 68795bf6bf24..bfac4dd7f03e 100644 --- a/test/common/grpc/grpc_client_integration_test.cc +++ b/test/common/grpc/grpc_client_integration_test.cc @@ -489,7 +489,7 @@ TEST_P(GrpcClientIntegrationTest, HttpNon200Status) { stream->expectTrailingMetadata(empty_metadata_); // Technically this should be // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md - // as given by Common::httpToGrpcStatus(), but the Google gRPC client treats + // as given by Grpc::Utility::httpToGrpcStatus(), but the Google gRPC client treats // this as GrpcStatus::Canceled. stream->expectGrpcStatus(Status::GrpcStatus::Canceled); stream->fake_stream_->encodeHeaders(reply_headers, true); diff --git a/test/common/http/utility_test.cc b/test/common/http/utility_test.cc index e6124eb6d0a3..c19c12e69347 100644 --- a/test/common/http/utility_test.cc +++ b/test/common/http/utility_test.cc @@ -14,6 +14,7 @@ #include "gtest/gtest.h" +using testing::Invoke; using testing::InvokeWithoutArgs; using testing::_; @@ -246,7 +247,22 @@ TEST(HttpUtility, SendLocalReply) { EXPECT_CALL(callbacks, encodeHeaders_(_, false)); EXPECT_CALL(callbacks, encodeData(_, true)); - Utility::sendLocalReply(callbacks, is_reset, Http::Code::PayloadTooLarge, "large"); + Utility::sendLocalReply(false, callbacks, is_reset, Http::Code::PayloadTooLarge, "large"); +} + +TEST(HttpUtility, SendLocalGrpcReply) { + MockStreamDecoderFilterCallbacks callbacks; + bool is_reset = false; + + EXPECT_CALL(callbacks, encodeHeaders_(_, true)) + .WillOnce(Invoke([&](const HeaderMap& headers, bool) -> void { + EXPECT_STREQ(headers.Status()->value().c_str(), "200"); + EXPECT_NE(headers.GrpcStatus(), nullptr); + EXPECT_STREQ(headers.GrpcStatus()->value().c_str(), "2"); // Unknown gRPC error. + EXPECT_NE(headers.GrpcMessage(), nullptr); + EXPECT_STREQ(headers.GrpcMessage()->value().c_str(), "large"); + })); + Utility::sendLocalReply(true, callbacks, is_reset, Http::Code::PayloadTooLarge, "large"); } TEST(HttpUtility, SendLocalReplyDestroyedEarly) { @@ -257,7 +273,7 @@ TEST(HttpUtility, SendLocalReplyDestroyedEarly) { is_reset = true; })); EXPECT_CALL(callbacks, encodeData(_, true)).Times(0); - Utility::sendLocalReply(callbacks, is_reset, Http::Code::PayloadTooLarge, "large"); + Utility::sendLocalReply(false, callbacks, is_reset, Http::Code::PayloadTooLarge, "large"); } TEST(HttpUtility, TestAppendHeader) { diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index 81c2c7321f1f..425554084786 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -101,6 +101,8 @@ TEST_P(Http2IntegrationTest, HittingDecoderFilterLimit) { testHittingDecoderFilt TEST_P(Http2IntegrationTest, HittingEncoderFilterLimit) { testHittingEncoderFilterLimit(); } +TEST_P(Http2IntegrationTest, GrpcRouterNotFound) { testGrpcRouterNotFound(); } + TEST_P(Http2IntegrationTest, GrpcRetry) { testGrpcRetry(); } // Send a request with overly large headers, and ensure it results in stream reset. diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index 9bc2dc6153d8..b697829131db 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -630,6 +630,21 @@ void HttpIntegrationTest::testRetry() { EXPECT_EQ(512U, response->body().size()); } +// Change the default route to be restrictive, and send a request to an alternate route. +void HttpIntegrationTest::testGrpcRouterNotFound() { + config_helper_.setDefaultHostAndRoute("foo.com", "/found"); + initialize(); + + BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( + lookupPort("http"), "POST", "/service/notfound", "", downstream_protocol_, version_, "host", + Http::Headers::get().ContentTypeValues.Grpc); + ASSERT_TRUE(response->complete()); + EXPECT_STREQ("200", response->headers().Status()->value().c_str()); + EXPECT_EQ(Http::Headers::get().ContentTypeValues.Grpc, + response->headers().ContentType()->value().c_str()); + EXPECT_STREQ("12", response->headers().GrpcStatus()->value().c_str()); +} + void HttpIntegrationTest::testGrpcRetry() { Http::TestHeaderMapImpl response_trailers{{"response1", "trailer1"}, {"grpc-status", "0"}}; initialize(); diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 99ff4a62e273..41e07e5bebfb 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -158,6 +158,7 @@ class HttpIntegrationTest : public BaseIntegrationTest { void testDrainClose(); void testRetry(); void testRetryHittingBufferLimit(); + void testGrpcRouterNotFound(); void testGrpcRetry(); void testHittingDecoderFilterLimit(); void testHittingEncoderFilterLimit(); diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index 888ade21a520..fc0726ae0cf9 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -147,13 +147,15 @@ TEST_P(IntegrationTest, HittingEncoderFilterLimitBufferingHeaders) { waitForNextUpstreamRequest(); // Send the overly large response. Because the grpc_http1_bridge filter buffers and buffer - // limits are sent, this will be translated into a 500 from Envoy. + // limits are exceeded, this will be translated into a 500 from Envoy. upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); upstream_request_->encodeData(1024 * 65, false); response->waitForEndStream(); EXPECT_TRUE(response->complete()); - EXPECT_STREQ("500", response->headers().Status()->value().c_str()); + EXPECT_STREQ("200", response->headers().Status()->value().c_str()); + EXPECT_NE(response->headers().GrpcStatus(), nullptr); + EXPECT_STREQ("2", response->headers().GrpcStatus()->value().c_str()); // Unknown gRPC error } TEST_P(IntegrationTest, HittingEncoderFilterLimit) { testHittingEncoderFilterLimit(); } diff --git a/test/integration/utility.cc b/test/integration/utility.cc index fa41c8b2e50c..4c8e4242c66b 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -55,7 +55,7 @@ BufferingStreamDecoderPtr IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPtr& addr, const std::string& method, const std::string& url, const std::string& body, Http::CodecClient::Type type, - const std::string& host) { + const std::string& host, const std::string& content_type) { Api::Impl api(std::chrono::milliseconds(9000)); Event::DispatcherPtr dispatcher(api.allocateDispatcher()); std::shared_ptr cluster{new NiceMock()}; @@ -78,6 +78,9 @@ IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPt headers.insertPath().value(url); headers.insertHost().value(host); headers.insertScheme().value(Http::Headers::get().SchemeValues.Http); + if (!content_type.empty()) { + headers.insertContentType().value(content_type); + } encoder.encodeHeaders(headers, body.empty()); if (!body.empty()) { Buffer::OwnedImpl body_buffer(body); @@ -88,12 +91,14 @@ IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPt return response; } -BufferingStreamDecoderPtr IntegrationUtil::makeSingleRequest( - uint32_t port, const std::string& method, const std::string& url, const std::string& body, - Http::CodecClient::Type type, Network::Address::IpVersion ip_version, const std::string& host) { +BufferingStreamDecoderPtr +IntegrationUtil::makeSingleRequest(uint32_t port, const std::string& method, const std::string& url, + const std::string& body, Http::CodecClient::Type type, + Network::Address::IpVersion ip_version, const std::string& host, + const std::string& content_type) { auto addr = Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(ip_version), port)); - return makeSingleRequest(addr, method, url, body, type, host); + return makeSingleRequest(addr, method, url, body, type, host, content_type); } RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, diff --git a/test/integration/utility.h b/test/integration/utility.h index c15b28306328..34c853fb2499 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -96,13 +96,14 @@ class IntegrationUtil { * @param body supplies the optional request body to send. * @param type supplies the codec to use for the request. * @param host supplies the host header to use for the request. + * @param content_type supplies the content-type header to use for the request, if any. * @return BufferingStreamDecoderPtr the complete request or a partial request if there was * remote easly disconnection. */ static BufferingStreamDecoderPtr makeSingleRequest(const Network::Address::InstanceConstSharedPtr& addr, const std::string& method, const std::string& url, const std::string& body, Http::CodecClient::Type type, - const std::string& host = "host"); + const std::string& host = "host", const std::string& content_type = ""); /** * Make a new connection, issues a request, and then disconnect when the request is complete. @@ -113,13 +114,15 @@ class IntegrationUtil { * @param type supplies the codec to use for the request. * @param version the IP addess version of the client and server. * @param host supplies the host header to use for the request. + * @param content_type supplies the content-type header to use for the request, if any. * @return BufferingStreamDecoderPtr the complete request or a partial request if there was * remote easly disconnection. */ static BufferingStreamDecoderPtr makeSingleRequest(uint32_t port, const std::string& method, const std::string& url, const std::string& body, Http::CodecClient::Type type, - Network::Address::IpVersion ip_version, const std::string& host = "host"); + Network::Address::IpVersion ip_version, const std::string& host = "host", + const std::string& content_type = ""); }; // A set of connection callbacks which tracks connection state. diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index a0aa273b9f64..015709cc02c0 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -14,6 +14,8 @@ #include "envoy/http/filter.h" #include "envoy/ssl/connection.h" +#include "common/http/utility.h" + #include "test/mocks/common.h" #include "test/mocks/event/mocks.h" #include "test/mocks/request_info/mocks.h" @@ -195,6 +197,19 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD0(decoderBufferLimit, uint32_t()); // Http::StreamDecoderFilterCallbacks + void sendLocalReply(Code code, const std::string& body, + std::function modify_headers) override { + Utility::sendLocalReply( + is_grpc_request_, + [this, modify_headers](HeaderMapPtr&& headers, bool end_stream) -> void { + if (modify_headers != nullptr) { + modify_headers(*headers); + } + encodeHeaders(std::move(headers), end_stream); + }, + [this](Buffer::Instance& data, bool end_stream) -> void { encodeData(data, end_stream); }, + stream_destroyed_, code, body); + } void encode100ContinueHeaders(HeaderMapPtr&& headers) override { encode100ContinueHeaders_(*headers); } @@ -215,6 +230,8 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, std::list callbacks_{}; testing::NiceMock active_span_; testing::NiceMock tracing_config_; + bool is_grpc_request_{}; + bool stream_destroyed_{}; }; class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks,