diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index e9b25c691e39..933e17b54dc6 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -22,6 +22,7 @@ envoy_cc_library( "//source/extensions/filters/http/common:pass_through_filter_lib", "@com_google_absl//absl/strings:str_format", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3alpha:pkg_cc_proto", + "@envoy_api//envoy/service/ext_proc/v3alpha:pkg_cc_proto", ], ) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 807a84de4ac3..874b35cf0ef4 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -9,6 +9,7 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { +using envoy::service::ext_proc::v3alpha::ImmediateResponse; using envoy::service::ext_proc::v3alpha::ProcessingRequest; using envoy::service::ext_proc::v3alpha::ProcessingResponse; @@ -66,8 +67,8 @@ void Filter::onReceiveMessage( } } } else if (response->has_immediate_response()) { - // To be implemented later. Leave stream open to allow people to implement - // correct servers that don't break us. + ENVOY_LOG(debug, "Returning immediate response from processor"); + sendImmediateResponse(response->immediate_response()); message_valid = true; } request_state_ = FilterState::IDLE; @@ -129,6 +130,23 @@ void Filter::onGrpcClose() { } } +void Filter::sendImmediateResponse(const ImmediateResponse& response) { + const auto status_code = response.has_status() ? response.status().code() : 200; + const auto grpc_status = + response.has_grpc_status() + ? absl::optional(response.grpc_status().status()) + : absl::nullopt; + + decoder_callbacks_->sendLocalReply( + static_cast(status_code), response.body(), + [&response](Http::ResponseHeaderMap& headers) { + if (response.has_headers()) { + MutationUtils::applyHeaderMutations(response.headers(), headers); + } + }, + grpc_status, response.details()); +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 5ccfb49453df..299780f3e05a 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -6,6 +6,7 @@ #include "envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.pb.h" #include "envoy/grpc/async_client.h" #include "envoy/http/filter.h" +#include "envoy/service/ext_proc/v3alpha/external_processor.pb.h" #include "envoy/stats/scope.h" #include "envoy/stats/stats_macros.h" @@ -100,6 +101,7 @@ class Filter : public Logger::Loggable, private: void closeStream(); + void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response); const FilterConfigSharedPtr config_; const ExternalProcessorClientPtr client_; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 0e921e71bb09..f0d8604645bf 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -19,6 +19,10 @@ using Extensions::HttpFilters::ExternalProcessing::ExtProcTestUtility; using Http::LowerCaseString; +// These tests exercise the ext_proc filter through Envoy's integration test +// environment by configuring an instance of the Envoy server and driving it +// through the mock network stack. + class ExtProcIntegrationTest : public HttpIntegrationTest, public Grpc::GrpcClientIntegrationParamTest { protected: @@ -59,6 +63,7 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_); config_helper_.addFilter(MessageUtil::getJsonStringFromMessage(ext_proc_filter)); }); + setDownstreamProtocol(Http::CodecClient::Type::HTTP2); } void waitForFirstMessage(ProcessingRequest& request) { @@ -75,9 +80,11 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, ExtProcIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// by immediately closing the stream. TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { initializeConfig(); - setDownstreamProtocol(Http::CodecClient::Type::HTTP2); HttpIntegrationTest::initialize(); auto conn = makeClientConnection(lookupPort("http")); @@ -109,9 +116,11 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { EXPECT_EQ("200", response->headers().getStatusValue()); } +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// by returning a failure before the first stream response can be sent. TEST_P(ExtProcIntegrationTest, GetAndFailStream) { initializeConfig(); - setDownstreamProtocol(Http::CodecClient::Type::HTTP2); HttpIntegrationTest::initialize(); auto conn = makeClientConnection(lookupPort("http")); @@ -130,9 +139,11 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStream) { EXPECT_EQ("500", response->headers().getStatusValue()); } +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// by requesting to modify the request headers. TEST_P(ExtProcIntegrationTest, GetAndSetHeaders) { initializeConfig(); - setDownstreamProtocol(Http::CodecClient::Type::HTTP2); HttpIntegrationTest::initialize(); auto conn = makeClientConnection(lookupPort("http")); @@ -188,4 +199,48 @@ TEST_P(ExtProcIntegrationTest, GetAndSetHeaders) { EXPECT_EQ("200", response->headers().getStatusValue()); } +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// by sending back an immediate_response message, which should be +// returned directly to the downstream. +TEST_P(ExtProcIntegrationTest, GetAndRespondImmediately) { + initializeConfig(); + HttpIntegrationTest::initialize(); + + auto conn = makeClientConnection(lookupPort("http")); + codec_client_ = makeHttpConnection(std::move(conn)); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + auto response = codec_client_->makeHeaderOnlyRequest(headers); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(request_headers_msg); + + EXPECT_TRUE(request_headers_msg.has_request_headers()); + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + + // Produce an immediate response + ProcessingResponse response_msg; + auto* immediate_response = response_msg.mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::Unauthorized); + immediate_response->set_body("{\"reason\": \"Not authorized\"}"); + immediate_response->set_details("Failed because you are not authorized"); + auto* hdr1 = immediate_response->mutable_headers()->add_set_headers(); + hdr1->mutable_header()->set_key("x-failure-reason"); + hdr1->mutable_header()->set_value("testing"); + auto* hdr2 = immediate_response->mutable_headers()->add_set_headers(); + hdr2->mutable_header()->set_key("content-type"); + hdr2->mutable_header()->set_value("application/json"); + processor_stream_->sendGrpcMessage(response_msg); + + response->waitForEndStream(); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("401", response->headers().getStatusValue()); + EXPECT_EQ( + "testing", + response->headers().get(LowerCaseString("x-failure-reason"))[0]->value().getStringView()); + EXPECT_EQ("application/json", response->headers().ContentType()->value().getStringView()); + EXPECT_EQ("{\"reason\": \"Not authorized\"}", response->body()); +} + } // namespace Envoy \ No newline at end of file diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 69968a23c886..bbd43d1f240d 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -29,10 +29,15 @@ using Http::FilterHeadersStatus; using Http::FilterTrailersStatus; using Http::LowerCaseString; +using testing::Eq; using testing::Invoke; +using testing::Unused; using namespace std::chrono_literals; +// These tests are all unit tests that directly drive an instance of the +// ext_proc filter and verify the behavior using mocks. + class HttpFilterTest : public testing::Test { protected: void initialize(std::string&& yaml) { @@ -81,8 +86,8 @@ class HttpFilterTest : public testing::Test { NiceMock stats_store_; FilterConfigSharedPtr config_; std::unique_ptr filter_; - NiceMock decoder_callbacks_; - NiceMock encoder_callbacks_; + Http::MockStreamDecoderFilterCallbacks decoder_callbacks_; + Http::MockStreamEncoderFilterCallbacks encoder_callbacks_; Http::TestRequestHeaderMapImpl request_headers_; Http::TestResponseHeaderMapImpl response_headers_; Http::TestRequestTrailerMapImpl request_trailers_; @@ -90,6 +95,8 @@ class HttpFilterTest : public testing::Test { Buffer::OwnedImpl data_; }; +// Using the default configuration, test the filter with a processor that +// replies to the request_headers message with an empty response TEST_F(HttpFilterTest, SimplestPost) { initialize(R"EOF( grpc_service: @@ -151,6 +158,9 @@ TEST_F(HttpFilterTest, SimplestPost) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Using the default configuration, test the filter with a processor that +// replies to the request_headers message with a message that modifies the request +// headers. TEST_F(HttpFilterTest, PostAndChangeHeaders) { initialize(R"EOF( grpc_service: @@ -185,22 +195,14 @@ TEST_F(HttpFilterTest, PostAndChangeHeaders) { stream_callbacks_->onReceiveMessage(std::move(resp1)); // We should now have changed the original header a bit - request_headers_.iterate([](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate { - std::cerr << e.key().getStringView() << ": " << e.value().getStringView() << '\n'; - return Http::HeaderMap::Iterate::Continue; - }); - auto get1 = request_headers_.get(LowerCaseString("x-new-header")); - EXPECT_EQ(get1.size(), 1); - EXPECT_EQ(get1[0]->key(), "x-new-header"); - EXPECT_EQ(get1[0]->value(), "new"); - auto get2 = request_headers_.get(LowerCaseString("x-some-other-header")); - EXPECT_EQ(get2.size(), 2); - EXPECT_EQ(get2[0]->key(), "x-some-other-header"); - EXPECT_EQ(get2[0]->value(), "yes"); - EXPECT_EQ(get2[1]->key(), "x-some-other-header"); - EXPECT_EQ(get2[1]->value(), "no"); - auto get3 = request_headers_.get(LowerCaseString("x-do-we-want-this")); - EXPECT_TRUE(get3.empty()); + Http::TestRequestHeaderMapImpl expected{{":path", "/"}, + {":method", "POST"}, + {":scheme", "http"}, + {"host", "host"}, + {"x-new-header", "new"}, + {"x-some-other-header", "yes"}, + {"x-some-other-header", "no"}}; + EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(expected, request_headers_)); data_.add("foo"); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); @@ -221,6 +223,10 @@ TEST_F(HttpFilterTest, PostAndChangeHeaders) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Using the default configuration, test the filter with a processor that +// replies to the request_headers message with an "immediate response" message +// that should result in a response being directly sent downstream with +// custom headers. TEST_F(HttpFilterTest, PostAndRespondImmediately) { initialize(R"EOF( grpc_service: @@ -233,19 +239,39 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) { EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, filter_->decodeHeaders(request_headers_, false)); + Http::TestResponseHeaderMapImpl immediate_response_headers; EXPECT_CALL(decoder_callbacks_, continueDecoding()); + EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::BadRequest, "Bad request", _, + Eq(absl::nullopt), "Got a bad request")) + .WillOnce(Invoke([&immediate_response_headers]( + Unused, Unused, + std::function modify_headers, + Unused, Unused) { modify_headers(immediate_response_headers); })); std::unique_ptr resp1 = std::make_unique(); auto* immediate_response = resp1->mutable_immediate_response(); immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); immediate_response->set_body("Bad request"); immediate_response->set_details("Got a bad request"); + auto* immediate_headers = immediate_response->mutable_headers(); + auto* hdr1 = immediate_headers->add_set_headers(); + hdr1->mutable_header()->set_key("content-type"); + hdr1->mutable_header()->set_value("text/plain"); + auto* hdr2 = immediate_headers->add_set_headers(); + hdr2->mutable_append()->set_value(true); + hdr2->mutable_header()->set_key("x-another-thing"); + hdr2->mutable_header()->set_value("1"); + auto* hdr3 = immediate_headers->add_set_headers(); + hdr3->mutable_append()->set_value(true); + hdr3->mutable_header()->set_key("x-another-thing"); + hdr3->mutable_header()->set_value("2"); stream_callbacks_->onReceiveMessage(std::move(resp1)); - - // Immediate response processing not yet implemented -- all we can expect - // at this point is that continueDecoding is called and that the - // stream is not yet closed. EXPECT_FALSE(stream_close_sent_); + Http::TestResponseHeaderMapImpl expected_response_headers{ + {"content-type", "text/plain"}, {"x-another-thing", "1"}, {"x-another-thing", "2"}}; + EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(expected_response_headers, + immediate_response_headers)); + data_.add("foo"); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); @@ -265,6 +291,99 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Using the default configuration, test the filter with a processor that +// replies to the request_headers message with an empty immediate_response message +TEST_F(HttpFilterTest, RespondImmediatelyDefault) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + Http::TestResponseHeaderMapImpl immediate_response_headers; + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::OK, "", _, Eq(absl::nullopt), "")) + .WillOnce(Invoke([&immediate_response_headers]( + Unused, Unused, + std::function modify_headers, + Unused, Unused) { modify_headers(immediate_response_headers); })); + std::unique_ptr resp1 = std::make_unique(); + /*auto* immediate_response = */ resp1->mutable_immediate_response(); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + EXPECT_FALSE(stream_close_sent_); + + Http::TestResponseHeaderMapImpl expected_response_headers{}; + EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(expected_response_headers, + immediate_response_headers)); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + EXPECT_TRUE(stream_close_sent_); +} + +// Using the default configuration, test the filter with a processor that +// replies to the request_headers message with an immediate response message +// that contains a non-default gRPC status. +TEST_F(HttpFilterTest, RespondImmediatelyGrpcError) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + )EOF"); + + HttpTestUtility::addDefaultHeaders(request_headers_, "POST"); + + EXPECT_EQ(FilterHeadersStatus::StopAllIterationAndWatermark, + filter_->decodeHeaders(request_headers_, false)); + + Http::TestResponseHeaderMapImpl immediate_response_headers; + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::Forbidden, "", _, Eq(999), "")) + .WillOnce(Invoke([&immediate_response_headers]( + Unused, Unused, + std::function modify_headers, + Unused, Unused) { modify_headers(immediate_response_headers); })); + std::unique_ptr resp1 = std::make_unique(); + auto* immediate_response = resp1->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::Forbidden); + immediate_response->mutable_grpc_status()->set_status(999); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + EXPECT_FALSE(stream_close_sent_); + + Http::TestResponseHeaderMapImpl expected_response_headers{}; + EXPECT_TRUE(TestUtility::headerMapEqualIgnoreOrder(expected_response_headers, + immediate_response_headers)); + + data_.add("foo"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); + + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encode100ContinueHeaders(response_headers_)); + EXPECT_EQ(FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers_, true)); + data_.add("bar"); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(data_, true)); + EXPECT_EQ(FilterTrailersStatus::Continue, filter_->encodeTrailers(response_trailers_)); + filter_->onDestroy(); + EXPECT_TRUE(stream_close_sent_); +} + +// Using the default configuration, test the filter with a processor that +// returns an error from from the gRPC stream. TEST_F(HttpFilterTest, PostAndFail) { initialize(R"EOF( grpc_service: @@ -281,7 +400,11 @@ TEST_F(HttpFilterTest, PostAndFail) { EXPECT_FALSE(stream_close_sent_); // Oh no! The remote server had a failure! - EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _)); + EXPECT_CALL(decoder_callbacks_, + sendLocalReply(Http::Code::InternalServerError, "", Eq(nullptr), Eq(absl::nullopt), + "ext_proc error: gRPC error 13")); + // In this case, this call includes header encoding + EXPECT_CALL(decoder_callbacks_, encodeHeaders_(_, _)); stream_callbacks_->onGrpcError(Grpc::Status::Internal); data_.add("foo"); @@ -303,6 +426,9 @@ TEST_F(HttpFilterTest, PostAndFail) { EXPECT_EQ(1, config_->stats().streams_failed_.value()); } +// Using the default configuration, test the filter with a processor that +// returns an error from the gRPC stream that is ignored because the +// failure_mode_allow parameter is set. TEST_F(HttpFilterTest, PostAndIgnoreFailure) { initialize(R"EOF( grpc_service: @@ -343,6 +469,8 @@ TEST_F(HttpFilterTest, PostAndIgnoreFailure) { EXPECT_EQ(1, config_->stats().failure_mode_allowed_.value()); } +// Using the default configuration, test the filter with a processor that +// replies to the request_headers message by closing the gRPC stream. TEST_F(HttpFilterTest, PostAndClose) { initialize(R"EOF( grpc_service: @@ -385,6 +513,10 @@ TEST_F(HttpFilterTest, PostAndClose) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Using the default configuration, test the filter with a processor that +// replies to the request_headers message incorrectly by sending a +// request_body message, which should result in the stream being closed +// and ignored. TEST_F(HttpFilterTest, OutOfOrder) { initialize(R"EOF( grpc_service: