diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/BUILD b/api/envoy/extensions/filters/http/ext_proc/v3/BUILD index 8322f99fa7df..5bfeeda1b7b8 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/BUILD +++ b/api/envoy/extensions/filters/http/ext_proc/v3/BUILD @@ -10,5 +10,6 @@ api_proto_package( "//envoy/config/core/v3:pkg", "//envoy/type/matcher/v3:pkg", "@com_github_cncf_xds//udpa/annotations:pkg", + "@com_github_cncf_xds//xds/annotations/v3:pkg", ], ) diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index 13a24ad9fcd7..83b15731b901 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -12,6 +12,8 @@ import "envoy/type/matcher/v3/string.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/struct.proto"; +import "xds/annotations/v3/status.proto"; + import "udpa/annotations/migrate.proto"; import "udpa/annotations/status.proto"; import "validate/validate.proto"; @@ -131,8 +133,39 @@ message ExternalProcessor { // Only one of ``http_service`` or // :ref:`grpc_service `. // can be set. It is required that one of them must be set. - ExtProcHttpService http_service = 20 - [(udpa.annotations.field_migrate).oneof_promotion = "ext_proc_service_type"]; + // + // If ``http_service`` is set, the + // :ref:`processing_mode ` + // can not be configured to send any body or trailers. i.e, http_service only supports + // sending request or response headers to the side stream server. + // + // With this configuration, Envoy behavior: + // + // 1. The headers are first put in a proto message + // :ref:`ProcessingRequest `. + // + // 2. This proto message is then transcoded into a JSON text. + // + // 3. Envoy then sends a HTTP POST message with content-type as "application/json", + // and this JSON text as body to the side stream server. + // + // After the side-stream receives this HTTP request message, it is expected to do as follows: + // + // 1. It converts the body, which is a JSON string, into a ``ProcessingRequest`` + // proto message to examine and mutate the headers. + // + // 2. It then sets the mutated headers into a new proto message + // :ref:`ProcessingResponse `. + // + // 3. It converts ``ProcessingResponse`` proto message into a JSON text. + // + // 4. It then sends a HTTP response back to Envoy with status code as "200", + // content-type as "application/json" and sets the JSON text as the body. + // + ExtProcHttpService http_service = 20 [ + (udpa.annotations.field_migrate).oneof_promotion = "ext_proc_service_type", + (xds.annotations.v3.field_status).work_in_progress = true + ]; // By default, if the gRPC stream cannot be established, or if it is closed // prematurely with an error, the filter will fail. Specifically, if the diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index c6cb55ffb276..2fafe8df478b 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -9,6 +9,15 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() +envoy_cc_library( + name = "client_base_interface", + hdrs = ["client_base.h"], + tags = ["skip_on_windows"], + deps = [ + "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", + ], +) + envoy_cc_library( name = "ext_proc", srcs = [ @@ -21,6 +30,7 @@ envoy_cc_library( ], tags = ["skip_on_windows"], deps = [ + ":client_base_interface", ":client_interface", ":matching_utils_lib", ":mutation_utils_lib", @@ -34,6 +44,7 @@ envoy_cc_library( "//source/common/runtime:runtime_features_lib", "//source/extensions/filters/common/mutation_rules:mutation_rules_lib", "//source/extensions/filters/http/common:pass_through_filter_lib", + "//source/extensions/filters/http/ext_proc/http_client:http_client_lib", "@com_google_absl//absl/status", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/strings:string_view", @@ -53,6 +64,7 @@ envoy_cc_extension( ":client_lib", ":ext_proc", "//source/extensions/filters/http/common:factory_base_lib", + "//source/extensions/filters/http/ext_proc/http_client:http_client_lib", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", ], ) @@ -62,6 +74,7 @@ envoy_cc_library( hdrs = ["client.h"], tags = ["skip_on_windows"], deps = [ + ":client_base_interface", "//envoy/grpc:async_client_manager_interface", "//envoy/grpc:status", "//envoy/stream_info:stream_info_interface", diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 413bbcac7730..d60f417051e6 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -10,13 +10,14 @@ #include "envoy/stream_info/stream_info.h" #include "source/common/http/sidestream_watermark.h" +#include "source/extensions/filters/http/ext_proc/client_base.h" namespace Envoy { namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -class ExternalProcessorStream { +class ExternalProcessorStream : public StreamBase { public: virtual ~ExternalProcessorStream() = default; virtual void send(envoy::service::ext_proc::v3::ProcessingRequest&& request, @@ -30,7 +31,7 @@ class ExternalProcessorStream { using ExternalProcessorStreamPtr = std::unique_ptr; -class ExternalProcessorCallbacks { +class ExternalProcessorCallbacks : public RequestCallbacks { public: virtual ~ExternalProcessorCallbacks() = default; virtual void onReceiveMessage( @@ -40,7 +41,7 @@ class ExternalProcessorCallbacks { virtual void logGrpcStreamInfo() PURE; }; -class ExternalProcessorClient { +class ExternalProcessorClient : public ClientBase { public: virtual ~ExternalProcessorClient() = default; virtual ExternalProcessorStreamPtr @@ -48,8 +49,6 @@ class ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE; - virtual ExternalProcessorStream* stream() PURE; - virtual void setStream(ExternalProcessorStream* stream) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_base.h b/source/extensions/filters/http/ext_proc/client_base.h new file mode 100644 index 000000000000..d37fd0c1f512 --- /dev/null +++ b/source/extensions/filters/http/ext_proc/client_base.h @@ -0,0 +1,47 @@ +#pragma once + +#include + +#include "envoy/service/ext_proc/v3/external_processor.pb.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +/** + * Async callbacks used during external processing. + */ +class RequestCallbacks { +public: + virtual ~RequestCallbacks() = default; + virtual void onComplete(envoy::service::ext_proc::v3::ProcessingResponse& response) PURE; + virtual void onError() PURE; +}; + +/** + * Stream base class used during external processing. + */ +class StreamBase { +public: + virtual ~StreamBase() = default; +}; + +/** + * Async client base class used during external processing. + */ +class ClientBase { +public: + virtual ~ClientBase() = default; + virtual void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& request, + bool end_stream, const uint64_t stream_id, RequestCallbacks* callbacks, + StreamBase* stream) PURE; + virtual void cancel() PURE; +}; + +using ClientBasePtr = std::unique_ptr; + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/ext_proc/client_impl.cc b/source/extensions/filters/http/ext_proc/client_impl.cc index fef44968dc5d..b52c91969906 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.cc +++ b/source/extensions/filters/http/ext_proc/client_impl.cc @@ -24,6 +24,13 @@ ExternalProcessorStreamPtr ExternalProcessorClientImpl::start( sidestream_watermark_callbacks); } +void ExternalProcessorClientImpl::sendRequest( + envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream, const uint64_t, + RequestCallbacks*, StreamBase* stream) { + ExternalProcessorStream* grpc_stream = dynamic_cast(stream); + grpc_stream->send(std::move(request), end_stream); +} + ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create( Grpc::AsyncClient&& client, ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options, diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 8ef177cda00c..d4ec819c7391 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -32,15 +32,14 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override; - ExternalProcessorStream* stream() override { return stream_; } - void setStream(ExternalProcessorStream* stream) override { stream_ = stream; } + void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream, + const uint64_t stream_id, RequestCallbacks* callbacks, + StreamBase* stream) override; + void cancel() override {} private: Grpc::AsyncClientManager& client_manager_; Stats::Scope& scope_; - // The gRPC stream to the external processor, which will be opened - // when it's time to send the first message. - ExternalProcessorStream* stream_ = nullptr; }; class ExternalProcessorStreamImpl : public ExternalProcessorStream, diff --git a/source/extensions/filters/http/ext_proc/config.cc b/source/extensions/filters/http/ext_proc/config.cc index 134fa8d190b2..076325b2d765 100644 --- a/source/extensions/filters/http/ext_proc/config.cc +++ b/source/extensions/filters/http/ext_proc/config.cc @@ -3,6 +3,7 @@ #include "source/extensions/filters/common/expr/evaluator.h" #include "source/extensions/filters/http/ext_proc/client_impl.h" #include "source/extensions/filters/http/ext_proc/ext_proc.h" +#include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h" namespace Envoy { namespace Extensions { @@ -22,15 +23,22 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped( proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms, dual_info.scope, stats_prefix, dual_info.is_upstream, Envoy::Extensions::Filters::Common::Expr::getBuilder(context), context); - - return [filter_config = std::move(filter_config), grpc_service = proto_config.grpc_service(), - &context, dual_info](Http::FilterChainFactoryCallbacks& callbacks) { - auto client = std::make_unique( - context.clusterManager().grpcAsyncClientManager(), dual_info.scope); - - callbacks.addStreamFilter(Http::StreamFilterSharedPtr{ - std::make_shared(filter_config, std::move(client), grpc_service)}); - }; + if (proto_config.has_grpc_service()) { + return [filter_config = std::move(filter_config), &context, + dual_info](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique( + context.clusterManager().grpcAsyncClientManager(), dual_info.scope); + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); + }; + } else { + return [proto_config = std::move(proto_config), filter_config = std::move(filter_config), + &context](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique(proto_config, context); + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); + }; + } } Router::RouteSpecificFilterConfigConstSharedPtr @@ -54,14 +62,22 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp server_context.scope(), stats_prefix, false, Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context), server_context); - return [filter_config = std::move(filter_config), grpc_service = proto_config.grpc_service(), - &server_context](Http::FilterChainFactoryCallbacks& callbacks) { - auto client = std::make_unique( - server_context.clusterManager().grpcAsyncClientManager(), server_context.scope()); - - callbacks.addStreamFilter(Http::StreamFilterSharedPtr{ - std::make_shared(filter_config, std::move(client), grpc_service)}); - }; + if (proto_config.has_grpc_service()) { + return [filter_config = std::move(filter_config), + &server_context](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique( + server_context.clusterManager().grpcAsyncClientManager(), server_context.scope()); + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); + }; + } else { + return [proto_config = std::move(proto_config), filter_config = std::move(filter_config), + &server_context](Http::FilterChainFactoryCallbacks& callbacks) { + auto client = std::make_unique(proto_config, server_context); + callbacks.addStreamFilter( + Http::StreamFilterSharedPtr{std::make_shared(filter_config, std::move(client))}); + }; + } } LEGACY_REGISTER_FACTORY(ExternalProcessingFilterConfig, diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 160f51910ff3..e087cdb1596f 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -9,6 +9,7 @@ #include "source/common/http/utility.h" #include "source/common/protobuf/utility.h" #include "source/common/runtime/runtime_features.h" +#include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h" #include "source/extensions/filters/http/ext_proc/mutation_utils.h" #include "absl/strings/str_format.h" @@ -21,6 +22,7 @@ namespace ExternalProcessing { namespace { using envoy::config::common::mutation_rules::v3::HeaderMutationRules; +using envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor; using envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute; using envoy::extensions::filters::http::ext_proc::v3::ProcessingMode; using envoy::type::v3::StatusCode; @@ -49,6 +51,19 @@ absl::optional initProcessingMode(const ExtProcPerRoute& config) return absl::nullopt; } +absl::optional +getFilterGrpcService(const ExternalProcessor& config) { + if (config.has_grpc_service() != config.has_http_service()) { + if (config.has_grpc_service()) { + return config.grpc_service(); + } + } else { + throw EnvoyException("One and only one of grpc_service or http_service must be configured"); + } + + return absl::nullopt; +} + absl::optional initGrpcService(const ExtProcPerRoute& config) { if (config.has_overrides() && config.overrides().has_grpc_service()) { @@ -177,18 +192,19 @@ ProcessingMode allDisabledMode() { } // namespace -FilterConfig::FilterConfig( - const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config, - const std::chrono::milliseconds message_timeout, const uint32_t max_message_timeout_ms, - Stats::Scope& scope, const std::string& stats_prefix, bool is_upstream, - Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder, - Server::Configuration::CommonFactoryContext& context) +FilterConfig::FilterConfig(const ExternalProcessor& config, + const std::chrono::milliseconds message_timeout, + const uint32_t max_message_timeout_ms, Stats::Scope& scope, + const std::string& stats_prefix, bool is_upstream, + Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder, + Server::Configuration::CommonFactoryContext& context) : failure_mode_allow_(config.failure_mode_allow()), observability_mode_(config.observability_mode()), route_cache_action_(config.route_cache_action()), deferred_close_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, deferred_close_timeout, DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS)), message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms), + grpc_service_(getFilterGrpcService(config)), send_body_without_waiting_for_header_response_( config.send_body_without_waiting_for_header_response()), stats_(generateStats(stats_prefix, config.stat_prefix(), scope)), @@ -215,14 +231,22 @@ FilterConfig::FilterConfig( config.response_attributes()), immediate_mutation_checker_(context.regexEngine()), thread_local_stream_manager_slot_(context.threadLocal().allocateSlot()) { - if (config.disable_clear_route_cache() && - (route_cache_action_ != - envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor::DEFAULT)) { + if (!grpc_service_.has_value()) { + // In case http_service configured, the processing mode can only support sending headers. + if (processing_mode_.request_body_mode() != ProcessingMode::NONE || + processing_mode_.response_body_mode() != ProcessingMode::NONE || + processing_mode_.request_trailer_mode() == ProcessingMode::SEND || + processing_mode_.response_trailer_mode() == ProcessingMode::SEND) { + throw EnvoyException( + "If http_service is configured, processing modes can not send any body or trailer."); + } + } + if (config.disable_clear_route_cache() && (route_cache_action_ != ExternalProcessor::DEFAULT)) { throw EnvoyException("disable_clear_route_cache and route_cache_action can not " "be set to none-default at the same time."); } if (config.disable_clear_route_cache()) { - route_cache_action_ = envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor::RETAIN; + route_cache_action_ = ExternalProcessor::RETAIN; } thread_local_stream_manager_slot_->set( [](Envoy::Event::Dispatcher&) { return std::make_shared(); }); @@ -333,6 +357,44 @@ void Filter::setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callb watermark_callbacks_.setEncoderFilterCallbacks(&callbacks); } +void Filter::sendRequest(ProcessingRequest&& req, bool end_stream) { + // Calling the client send function to send the request. + client_->sendRequest(std::move(req), end_stream, filter_callbacks_->streamId(), this, stream_); +} + +void Filter::onComplete(ProcessingResponse& response) { + ENVOY_LOG(debug, "Received successful response from server"); + std::unique_ptr resp_ptr = std::make_unique(response); + onReceiveMessage(std::move(resp_ptr)); +} + +void Filter::onError() { + ENVOY_LOG(debug, "Received Error response from server"); + stats_.http_not_ok_resp_received_.inc(); + + if (processing_complete_) { + ENVOY_LOG(debug, "Ignoring stream message received after processing complete"); + return; + } + + if (config_->failureModeAllow()) { + // The user would like a none-200-ok response to not cause message processing to fail. + // Close the external processing. + processing_complete_ = true; + stats_.failure_mode_allowed_.inc(); + clearAsyncState(); + } else { + // Return an error and stop processing the current stream. + processing_complete_ = true; + decoding_state_.onFinishProcessorCall(Grpc::Status::Aborted); + encoding_state_.onFinishProcessorCall(Grpc::Status::Aborted); + ImmediateResponse errorResponse; + errorResponse.mutable_status()->set_code(StatusCode::InternalServerError); + errorResponse.set_details(absl::StrCat(ErrorPrefix, "_HTTP_ERROR")); + sendImmediateResponse(errorResponse); + } +} + Filter::StreamOpenState Filter::openStream() { // External processing is completed. This means there is no need to send any further // message to the server for processing. Just return IgnoreError so the filter @@ -341,7 +403,12 @@ Filter::StreamOpenState Filter::openStream() { ENVOY_LOG(debug, "External processing is completed when trying to open the gRPC stream"); return StreamOpenState::IgnoreError; } - if (!client_->stream()) { + + if (!config().grpcService().has_value()) { + return StreamOpenState::Ok; + } + + if (!stream_) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); Http::AsyncClient::ParentContext grpc_context; @@ -351,8 +418,9 @@ Filter::StreamOpenState Filter::openStream() { .setParentContext(grpc_context) .setBufferBodyForRetry(grpc_service_.has_retry_policy()); + ExternalProcessorClient* grpc_client = dynamic_cast(client_.get()); ExternalProcessorStreamPtr stream_object = - client_->start(*this, config_with_hash_key_, options, watermark_callbacks_); + grpc_client->start(*this, config_with_hash_key_, options, watermark_callbacks_); if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called @@ -363,26 +431,29 @@ Filter::StreamOpenState Filter::openStream() { } stats_.streams_started_.inc(); - ExternalProcessorStream* stream = config_->threadLocalStreamManager().store( - std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); - client_->setStream(stream); + stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(), + config_->deferredCloseTimeout()); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not // have a proper implementation of streamInfo. if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) { - logging_info_->setClusterInfo(client_->stream()->streamInfo().upstreamClusterInfo()); + logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo()); } } return StreamOpenState::Ok; } void Filter::closeStream() { - if (client_->stream()) { + if (!config_->grpcService().has_value()) { + return; + } + + if (stream_) { ENVOY_LOG(debug, "Calling close on stream"); - if (client_->stream()->close()) { + if (stream_->close()) { stats_.streams_closed_.inc(); } - config_->threadLocalStreamManager().erase(client_->stream()); - client_->setStream(nullptr); + config_->threadLocalStreamManager().erase(stream_); + stream_ = nullptr; } else { ENVOY_LOG(debug, "Stream already closed"); } @@ -390,8 +461,7 @@ void Filter::closeStream() { void Filter::deferredCloseStream() { ENVOY_LOG(debug, "Calling deferred close on stream"); - config_->threadLocalStreamManager().deferredErase(client_->stream(), - filter_callbacks_->dispatcher()); + config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); } void Filter::onDestroy() { @@ -402,6 +472,11 @@ void Filter::onDestroy() { decoding_state_.stopMessageTimer(); encoding_state_.stopMessageTimer(); + if (!config_->grpcService().has_value()) { + client_->cancel(); + return; + } + if (config_->observabilityMode()) { // In observability mode where the main stream processing and side stream processing are // asynchronous, it is possible that filter instance is destroyed before the side stream request @@ -409,8 +484,8 @@ void Filter::onDestroy() { // closure is deferred upon filter destruction with a timer. // First, release the referenced filter resource. - if (client_->stream() != nullptr) { - client_->stream()->notifyFilterDestroy(); + if (stream_ != nullptr) { + stream_->notifyFilterDestroy(); } // Second, perform stream deferred closure. @@ -440,7 +515,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); state.setPaused(true); return FilterHeadersStatus::StopIteration; @@ -673,7 +748,7 @@ Filter::sendHeadersInObservabilityMode(Http::RequestOrResponseHeaderMap& headers ProcessingRequest req = buildHeaderRequest(state, headers, end_stream, /*observability_mode=*/true); ENVOY_LOG(debug, "Sending headers message in observability mode"); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); return FilterHeadersStatus::Continue; @@ -698,7 +773,7 @@ Http::FilterDataStatus Filter::sendDataInObservabilityMode(Buffer::Instance& dat // Set up the the body chunk and send. auto req = setupBodyChunk(state, data, end_stream); req.set_observability_mode(true); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); ENVOY_LOG(debug, "Sending body message in ObservabilityMode"); } else if (state.bodyMode() != ProcessingMode::NONE) { @@ -890,7 +965,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState ProcessingRequest& req) { state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), new_state); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); } @@ -906,21 +981,24 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::TrailersCallback); ENVOY_LOG(debug, "Sending trailers message"); - client_->stream()->send(std::move(req), false); + sendRequest(std::move(req), false); stats_.stream_msgs_sent_.inc(); } void Filter::logGrpcStreamInfo() { - if (client_->stream() != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { - const auto& upstream_meter = client_->stream()->streamInfo().getUpstreamBytesMeter(); + if (!config().grpcService().has_value()) { + return; + } + + if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { + const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter(); if (upstream_meter != nullptr) { logging_info_->setBytesSent(upstream_meter->wireBytesSent()); logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); } // Only set upstream host in logging info once. if (logging_info_->upstreamHost() == nullptr) { - logging_info_->setUpstreamHost( - client_->stream()->streamInfo().upstreamInfo()->upstreamHost()); + logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); } } } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index ce5a9ed4b195..240ffc505e71 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -24,6 +24,7 @@ #include "source/extensions/filters/common/mutation_rules/mutation_rules.h" #include "source/extensions/filters/http/common/pass_through_filter.h" #include "source/extensions/filters/http/ext_proc/client.h" +#include "source/extensions/filters/http/ext_proc/client_base.h" #include "source/extensions/filters/http/ext_proc/matching_utils.h" #include "source/extensions/filters/http/ext_proc/processor_state.h" @@ -47,7 +48,8 @@ namespace ExternalProcessing { COUNTER(clear_route_cache_ignored) \ COUNTER(clear_route_cache_disabled) \ COUNTER(clear_route_cache_upstream_ignored) \ - COUNTER(send_immediate_resp_upstream_ignored) + COUNTER(send_immediate_resp_upstream_ignored) \ + COUNTER(http_not_ok_resp_received) struct ExtProcFilterStats { ALL_EXT_PROC_FILTER_STATS(GENERATE_COUNTER_STRUCT) @@ -274,6 +276,10 @@ class FilterConfig { return thread_local_stream_manager_slot_->getTyped(); } + const absl::optional grpcService() const { + return grpc_service_; + } + private: ExtProcFilterStats generateStats(const std::string& prefix, const std::string& filter_stats_prefix, Stats::Scope& scope) { @@ -287,6 +293,7 @@ class FilterConfig { const std::chrono::milliseconds deferred_close_timeout_; const std::chrono::milliseconds message_timeout_; const uint32_t max_message_timeout_ms_; + const absl::optional grpc_service_; const bool send_body_without_waiting_for_header_response_; ExtProcFilterStats stats_; @@ -379,10 +386,11 @@ class Filter : public Logger::Loggable, }; public: - Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client, - const envoy::config::core::v3::GrpcService& grpc_service) + Filter(const FilterConfigSharedPtr& config, ClientBasePtr&& client) : config_(config), client_(std::move(client)), stats_(config->stats()), - grpc_service_(grpc_service), config_with_hash_key_(grpc_service), + grpc_service_(config->grpcService().has_value() ? config->grpcService().value() + : envoy::config::core::v3::GrpcService()), + config_with_hash_key_(grpc_service_), decoding_state_(*this, config->processingMode(), config->untypedForwardingMetadataNamespaces(), config->typedForwardingMetadataNamespaces(), @@ -444,6 +452,8 @@ class Filter : public Logger::Loggable, const ProcessorState& encodingState() { return encoding_state_; } const ProcessorState& decodingState() { return decoding_state_; } + void onComplete(envoy::service::ext_proc::v3::ProcessingResponse& response) override; + void onError() override; private: void mergePerRouteConfig(); @@ -480,8 +490,10 @@ class Filter : public Logger::Loggable, buildHeaderRequest(ProcessorState& state, Http::RequestOrResponseHeaderMap& headers, bool end_stream, bool observability_mode); + void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& req, bool end_stream); + const FilterConfigSharedPtr config_; - const ExternalProcessorClientPtr client_; + const ClientBasePtr client_; ExtProcFilterStats stats_; ExtProcLoggingInfo* logging_info_; envoy::config::core::v3::GrpcService grpc_service_; @@ -491,6 +503,10 @@ class Filter : public Logger::Loggable, DecodingProcessorState decoding_state_; EncodingProcessorState encoding_state_; + // The gRPC stream to the external processor, which will be opened + // when it's time to send the first message. + ExternalProcessorStream* stream_ = nullptr; + // Set to true when no more messages need to be sent to the processor. // This happens when the processor has closed the stream, or when it has // failed. diff --git a/source/extensions/filters/http/ext_proc/http_client/BUILD b/source/extensions/filters/http/ext_proc/http_client/BUILD index 16afa5f69849..681afb3bfdbb 100644 --- a/source/extensions/filters/http/ext_proc/http_client/BUILD +++ b/source/extensions/filters/http/ext_proc/http_client/BUILD @@ -8,22 +8,17 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() -envoy_cc_library( - name = "client_base_interface", - hdrs = ["client_base.h"], - tags = ["skip_on_windows"], - deps = [], -) - envoy_cc_library( name = "http_client_lib", srcs = ["http_client_impl.cc"], hdrs = ["http_client_impl.h"], tags = ["skip_on_windows"], deps = [ - "client_base_interface", "//source/common/common:enum_to_int", + "//source/common/http:header_map_lib", "//source/common/http:utility_lib", - "//source/extensions/filters/http/ext_proc", + "//source/extensions/filters/http/ext_proc:client_base_interface", + "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", + "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", ], ) diff --git a/source/extensions/filters/http/ext_proc/http_client/client_base.h b/source/extensions/filters/http/ext_proc/http_client/client_base.h deleted file mode 100644 index fd9ae5ce7c7f..000000000000 --- a/source/extensions/filters/http/ext_proc/http_client/client_base.h +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#include - -namespace Envoy { -namespace Extensions { -namespace HttpFilters { -namespace ExternalProcessing { - -/** - * Async callbacks used during external processing. - */ -class RequestCallbacks { -public: - virtual ~RequestCallbacks() = default; - virtual void onComplete() PURE; -}; - -/** - * Async client base class used during external processing. - */ -class ClientBase { -public: - virtual ~ClientBase() = default; - - virtual void sendRequest() PURE; - virtual void cancel() PURE; -}; - -} // namespace ExternalProcessing -} // namespace HttpFilters -} // namespace Extensions -} // namespace Envoy diff --git a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc index abc172192389..b333ea513cfc 100644 --- a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc +++ b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc @@ -1,6 +1,8 @@ #include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h" #include "source/common/common/enum_to_int.h" +#include "source/common/http/header_map_impl.h" +#include "source/common/http/message_impl.h" #include "source/common/http/utility.h" namespace Envoy { @@ -8,13 +10,90 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { +namespace { +Http::RequestMessagePtr buildHttpRequest(absl::string_view uri, const uint64_t stream_id, + absl::string_view req_in_json) { + absl::string_view host, path; + Envoy::Http::Utility::extractHostPathFromUri(uri, host, path); + ENVOY_LOG_MISC(debug, " Ext_Proc HTTP client send request to uri {}, host {}, path {}", uri, host, + path); + + // Construct a HTTP POST message. + const Envoy::Http::HeaderValues& header_values = Envoy::Http::Headers::get(); + Http::RequestHeaderMapPtr headers = + Envoy::Http::createHeaderMap( + {{header_values.Method, "POST"}, + {header_values.Scheme, "http"}, + {header_values.Path, std::string(path)}, + {header_values.ContentType, "application/json"}, + {header_values.RequestId, std::to_string(stream_id)}, + {header_values.Host, std::string(host)}}); + Http::RequestMessagePtr message = + std::make_unique(std::move(headers)); + message->body().add(req_in_json); + return message; +} + +} // namespace + +void ExtProcHttpClient::sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& req, bool, + const uint64_t stream_id, RequestCallbacks* callbacks, + StreamBase*) { + // Cancel any active requests. + cancel(); + callbacks_ = callbacks; + + // Transcode req message into JSON string. + auto req_in_json = MessageUtil::getJsonStringFromMessage(req); + if (req_in_json.ok()) { + const auto http_uri = config_.http_service().http_service().http_uri(); + Http::RequestMessagePtr message = + buildHttpRequest(http_uri.uri(), stream_id, req_in_json.value()); + auto options = Http::AsyncClient::RequestOptions() + .setTimeout(std::chrono::milliseconds( + DurationUtil::durationToMilliseconds(http_uri.timeout()))) + .setSendInternal(false) + .setSendXff(false); + const std::string cluster = http_uri.cluster(); + const auto thread_local_cluster = context().clusterManager().getThreadLocalCluster(cluster); + if (thread_local_cluster) { + active_request_ = + thread_local_cluster->httpAsyncClient().send(std::move(message), *this, options); + } else { + ENVOY_LOG(error, "ext_proc cluster {} does not exist in the config", cluster); + } + } +} + void ExtProcHttpClient::onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) { auto status = Envoy::Http::Utility::getResponseStatusOrNullopt(response->headers()); + active_request_ = nullptr; if (status.has_value()) { uint64_t status_code = status.value(); if (status_code == Envoy::enumToInt(Envoy::Http::Code::OK)) { - ENVOY_LOG(error, "Response status is OK"); + std::string msg_body = response->body().toString(); + ENVOY_LOG(debug, "Response status is OK, message body length {}", msg_body.size()); + envoy::service::ext_proc::v3::ProcessingResponse response_msg; + if (!msg_body.empty()) { + bool has_unknown_field; + auto status = MessageUtil::loadFromJsonNoThrow(msg_body, response_msg, has_unknown_field); + if (!status.ok()) { + ENVOY_LOG( + error, + "The HTTP response body can not be decoded into a ProcessResponse proto message"); + onError(); + return; + } + } else { + ENVOY_LOG(error, "Response body is empty"); + onError(); + return; + } + if (callbacks_) { + callbacks_->onComplete(response_msg); + callbacks_ = nullptr; + } } else { ENVOY_LOG(error, "Response status is not OK, status: {}", status_code); onError(); @@ -31,12 +110,25 @@ void ExtProcHttpClient::onFailure(const Http::AsyncClient::Request&, ASSERT(reason == Http::AsyncClient::FailureReason::Reset || reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit); ENVOY_LOG(error, "Request failed: stream has been reset"); + active_request_ = nullptr; onError(); } void ExtProcHttpClient::onError() { // Cancel if the request is active. cancel(); + ENVOY_LOG(error, "ext_proc HTTP client error condition happens."); + if (callbacks_) { + callbacks_->onError(); + callbacks_ = nullptr; + } +} + +void ExtProcHttpClient::cancel() { + if (active_request_) { + active_request_->cancel(); + active_request_ = nullptr; + } } } // namespace ExternalProcessing diff --git a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h index fa2df5afd12d..6d9aace5ad1a 100644 --- a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h +++ b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h @@ -2,11 +2,12 @@ #include +#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" #include "envoy/http/async_client.h" +#include "envoy/service/ext_proc/v3/external_processor.pb.h" #include "source/common/common/logger.h" -#include "source/extensions/filters/http/ext_proc/ext_proc.h" -#include "source/extensions/filters/http/ext_proc/http_client/client_base.h" +#include "source/extensions/filters/http/ext_proc/client_base.h" namespace Envoy { namespace Extensions { @@ -23,8 +24,10 @@ class ExtProcHttpClient : public ClientBase, ~ExtProcHttpClient() { cancel(); } - void sendRequest() override {} - void cancel() override {} + void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& req, bool end_stream, + const uint64_t stream_id, RequestCallbacks* callbacks, + StreamBase* stream) override; + void cancel() override; void onBeforeFinalizeUpstreamSpan(Tracing::Span&, const Http::ResponseHeaderMap*) override {} // Http::AsyncClient::Callbacks implemented by this class. @@ -39,6 +42,8 @@ class ExtProcHttpClient : public ClientBase, void onError(); envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor config_; Server::Configuration::ServerFactoryContext& context_; + Http::AsyncClient::Request* active_request_{}; + RequestCallbacks* callbacks_{}; }; } // namespace ExternalProcessing diff --git a/test/extensions/filters/http/ext_proc/client_test.cc b/test/extensions/filters/http/ext_proc/client_test.cc index 590cd2e05e0f..6e910c367ca5 100644 --- a/test/extensions/filters/http/ext_proc/client_test.cc +++ b/test/extensions/filters/http/ext_proc/client_test.cc @@ -66,6 +66,8 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback void onGrpcClose() override { grpc_closed_ = true; } void logGrpcStreamInfo() override {} + void onComplete(envoy::service::ext_proc::v3::ProcessingResponse&) override {} + void onError() override {} std::unique_ptr last_response_; Grpc::Status::GrpcStatus grpc_status_ = Grpc::Status::WellKnownGrpcStatus::Ok; diff --git a/test/extensions/filters/http/ext_proc/config_test.cc b/test/extensions/filters/http/ext_proc/config_test.cc index 5c5196af6551..b635fc9e3030 100644 --- a/test/extensions/filters/http/ext_proc/config_test.cc +++ b/test/extensions/filters/http/ext_proc/config_test.cc @@ -60,7 +60,7 @@ TEST(HttpExtProcConfigTest, CorrectConfig) { cb(filter_callback); } -TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { +TEST(HttpExtProcConfigTest, CorrectGrpcServiceConfigServerContext) { std::string yaml = R"EOF( grpc_service: google_grpc: @@ -106,6 +106,33 @@ TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { cb(filter_callback); } +TEST(HttpExtProcConfigTest, CorrectHttpServiceConfigServerContext) { + std::string yaml = R"EOF( + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + failure_mode_allow: true + processing_mode: + request_header_mode: send + )EOF"; + + ExternalProcessingFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyConfigProto(); + TestUtility::loadFromYaml(yaml, *proto_config); + + testing::NiceMock context; + EXPECT_CALL(context, messageValidationVisitor()); + Http::FilterFactoryCb cb = + factory.createFilterFactoryFromProtoWithServerContext(*proto_config, "stats", context); + Http::MockFilterChainFactoryCallbacks filter_callback; + EXPECT_CALL(filter_callback, addStreamFilter(_)); + cb(filter_callback); +} + TEST(HttpExtProcConfigTest, CorrectRouteMetadataOnlyConfig) { std::string yaml = R"EOF( overrides: diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index edb3c2b66492..148d47e2de26 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -135,7 +135,7 @@ class HttpFilterTest : public testing::Test { std::make_shared( Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), factory_context_); - filter_ = std::make_unique(config_, std::move(client_), proto_config.grpc_service()); + filter_ = std::make_unique(config_, std::move(client_)); filter_->setEncoderFilterCallbacks(encoder_callbacks_); EXPECT_CALL(encoder_callbacks_, encoderBufferLimit()).WillRepeatedly(Return(BufferSize)); filter_->setDecoderFilterCallbacks(decoder_callbacks_); @@ -2750,6 +2750,93 @@ TEST_F(HttpFilterTest, ProcessingModeResponseHeadersOnlyWithoutCallingDecodeHead EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +TEST_F(HttpFilterTest, GrpcServiceHttpServiceBothSet) { + std::string yaml = R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + processing_mode: + response_body_mode: "BUFFERED" + )EOF"; + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config{}; + TestUtility::loadFromYaml(yaml, proto_config); + EXPECT_THROW_WITH_MESSAGE( + { + auto config = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", false, + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + factory_context_); + }, + EnvoyException, "One and only one of grpc_service or http_service must be configured"); +} + +TEST_F(HttpFilterTest, HttpServiceBodyProcessingModeNotNone) { + std::string yaml = R"EOF( + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + processing_mode: + response_header_mode: "SEND" + request_body_mode: "BUFFERED" + )EOF"; + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config{}; + TestUtility::loadFromYaml(yaml, proto_config); + EXPECT_THROW_WITH_MESSAGE( + { + auto config = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", false, + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + factory_context_); + }, + EnvoyException, + "If http_service is configured, processing modes can not send any body or trailer."); +} + +TEST_F(HttpFilterTest, HttpServiceTrailerProcessingModeNotSKIP) { + std::string yaml = R"EOF( + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + processing_mode: + request_body_mode: "NONE" + response_body_mode: "NONE" + request_trailer_mode: "SKIP" + response_trailer_mode: "SEND" + )EOF"; + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config{}; + TestUtility::loadFromYaml(yaml, proto_config); + EXPECT_THROW_WITH_MESSAGE( + { + auto config = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", false, + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + factory_context_); + }, + EnvoyException, + "If http_service is configured, processing modes can not send any body or trailer."); +} + // Using the default configuration, verify that the "clear_route_cache" flag makes the appropriate // callback on the filter for inbound traffic when header modifications are also present. // Also verify it does not make the callback for outbound traffic. diff --git a/test/extensions/filters/http/ext_proc/http_client/BUILD b/test/extensions/filters/http/ext_proc/http_client/BUILD index 0cc7d2bdab7b..1a073794cc2c 100644 --- a/test/extensions/filters/http/ext_proc/http_client/BUILD +++ b/test/extensions/filters/http/ext_proc/http_client/BUILD @@ -25,3 +25,24 @@ envoy_extension_cc_test( "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", ], ) + +envoy_extension_cc_test( + name = "ext_proc_http_integration_test", + srcs = ["ext_proc_http_integration_test.cc"], + extension_names = ["envoy.filters.http.ext_proc"], + rbe_pool = "2core", + shard_count = 8, + tags = [ + "cpu:3", + "skip_on_windows", + ], + deps = [ + "//source/extensions/filters/http/ext_proc:config", + "//test/common/http:common_lib", + "//test/extensions/filters/http/ext_proc:utils_lib", + "//test/integration:http_protocol_integration_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", + "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", + ], +) diff --git a/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc b/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc new file mode 100644 index 000000000000..1b8e9899fc27 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc @@ -0,0 +1,493 @@ +#include +#include + +#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" +#include "envoy/service/ext_proc/v3/external_processor.pb.h" + +#include "source/extensions/filters/http/ext_proc/config.h" +#include "source/extensions/filters/http/ext_proc/ext_proc.h" + +#include "test/common/http/common.h" +#include "test/extensions/filters/http/ext_proc/utils.h" +#include "test/integration/http_protocol_integration.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { +namespace { + +using envoy::extensions::filters::http::ext_proc::v3::ProcessingMode; +using envoy::service::ext_proc::v3::BodyResponse; +using envoy::service::ext_proc::v3::CommonResponse; +using envoy::service::ext_proc::v3::HeadersResponse; +using envoy::service::ext_proc::v3::HttpBody; +using envoy::service::ext_proc::v3::HttpHeaders; +using envoy::service::ext_proc::v3::HttpTrailers; +using envoy::service::ext_proc::v3::ProcessingRequest; +using envoy::service::ext_proc::v3::ProcessingResponse; +using envoy::service::ext_proc::v3::TrailersResponse; +using Extensions::HttpFilters::ExternalProcessing::HasHeader; +using Extensions::HttpFilters::ExternalProcessing::HasNoHeader; +using Extensions::HttpFilters::ExternalProcessing::HeaderProtosEqual; +using Extensions::HttpFilters::ExternalProcessing::SingleHeaderValueIs; + +using Http::LowerCaseString; + +struct ConfigOptions { + bool downstream_filter = true; + bool failure_mode_allow = false; + int64_t timeout = 900000000; + std::string cluster = "ext_proc_server_0"; +}; + +struct ExtProcHttpTestParams { + Network::Address::IpVersion version; + Http::CodecType downstream_protocol; + Http::CodecType upstream_protocol; +}; + +class ExtProcHttpClientIntegrationTest : public testing::TestWithParam, + public HttpIntegrationTest { +public: + ExtProcHttpClientIntegrationTest() + : HttpIntegrationTest(GetParam().downstream_protocol, GetParam().version) {} + void createUpstreams() override { + HttpIntegrationTest::createUpstreams(); + + // Create separate "upstreams" for ExtProc side stream servers + for (int i = 0; i < side_stream_count_; ++i) { + http_side_upstreams_.push_back(&addFakeUpstream(Http::CodecType::HTTP2)); + } + } + + void TearDown() override { + if (processor_connection_) { + ASSERT_TRUE(processor_connection_->close()); + ASSERT_TRUE(processor_connection_->waitForDisconnect()); + } + cleanupUpstreamAndDownstream(); + } + + void initializeConfig(ConfigOptions config_option = {}, + const std::vector>& cluster_endpoints = {{0, 1}, + {1, 1}}) { + int total_cluster_endpoints = 0; + std::for_each( + cluster_endpoints.begin(), cluster_endpoints.end(), + [&total_cluster_endpoints](const auto& item) { total_cluster_endpoints += item.second; }); + ASSERT_EQ(total_cluster_endpoints, side_stream_count_); + + config_helper_.addConfigModifier([this, cluster_endpoints, config_option]( + envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Ensure "HTTP2 with no prior knowledge." + ConfigHelper::setHttp2( + *(bootstrap.mutable_static_resources()->mutable_clusters()->Mutable(0))); + + // Clusters for ExtProc servers, starting by copying an existing cluster. + for (const auto& [cluster_id, endpoint_count] : cluster_endpoints) { + auto* server_cluster = bootstrap.mutable_static_resources()->add_clusters(); + server_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + std::string cluster_name = absl::StrCat("ext_proc_server_", cluster_id); + server_cluster->set_name(cluster_name); + server_cluster->mutable_load_assignment()->set_cluster_name(cluster_name); + ASSERT_EQ(server_cluster->load_assignment().endpoints_size(), 1); + auto* endpoints = server_cluster->mutable_load_assignment()->mutable_endpoints(0); + ASSERT_EQ(endpoints->lb_endpoints_size(), 1); + for (int i = 1; i < endpoint_count; ++i) { + auto* new_lb_endpoint = endpoints->add_lb_endpoints(); + new_lb_endpoint->MergeFrom(endpoints->lb_endpoints(0)); + } + } + + auto* http_uri = + proto_config_.mutable_http_service()->mutable_http_service()->mutable_http_uri(); + http_uri->set_uri("ext_proc_server_0:9000"); + http_uri->set_cluster(config_option.cluster); + http_uri->mutable_timeout()->set_nanos(config_option.timeout); + + if (config_option.failure_mode_allow) { + proto_config_.set_failure_mode_allow(true); + } + std::string ext_proc_filter_name = "envoy.filters.http.ext_proc"; + if (config_option.downstream_filter) { + // Construct a configuration proto for our filter and then re-write it + // to JSON so that we can add it to the overall config + envoy::extensions::filters::network::http_connection_manager::v3::HttpFilter + ext_proc_filter; + ext_proc_filter.set_name(ext_proc_filter_name); + ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_); + config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter)); + } + }); + + setUpstreamProtocol(GetParam().upstream_protocol); + setDownstreamProtocol(GetParam().downstream_protocol); + } + + IntegrationStreamDecoderPtr sendDownstreamRequest( + absl::optional> modify_headers) { + auto conn = makeClientConnection(lookupPort("http")); + codec_client_ = makeHttpConnection(std::move(conn)); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + if (modify_headers) { + (*modify_headers)(headers); + } + return codec_client_->makeHeaderOnlyRequest(headers); + } + + IntegrationStreamDecoderPtr sendDownstreamRequestWithBodyAndTrailer(absl::string_view body) { + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + codec_client_->sendData(*request_encoder_, body, false); + Http::TestRequestTrailerMapImpl request_trailers{{"x-trailer-foo", "yes"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + + return response; + } + + void getAndCheckHttpRequest(FakeUpstream* side_stream, bool first_message = false) { + if (first_message) { + ASSERT_TRUE(side_stream->waitForHttpConnection(*dispatcher_, processor_connection_)); + } + + ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); + ASSERT_TRUE(processor_stream_->waitForEndStream(*dispatcher_)); + EXPECT_THAT(processor_stream_->headers(), + SingleHeaderValueIs("content-type", "application/json")); + EXPECT_THAT(processor_stream_->headers(), SingleHeaderValueIs(":method", "POST")); + EXPECT_THAT(processor_stream_->headers(), HasHeader("x-request-id")); + } + + void sendHttpResponse(ProcessingResponse& response) { + // Sending 200 response with the ProcessingResponse JSON encoded in the body. + std::string response_str = MessageUtil::getJsonStringFromMessageOrError(response, true, true); + processor_stream_->encodeHeaders( + Http::TestResponseHeaderMapImpl{{":status", "200"}, {"content-type", "application/json"}}, + false); + processor_stream_->encodeData(response_str, true); + } + + void processRequestHeadersMessage( + FakeUpstream* side_stream, bool first_message, + absl::optional> cb, + bool send_bad_resp = false) { + getAndCheckHttpRequest(side_stream, first_message); + + if (send_bad_resp) { + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "400"}}, true); + return; + } + // The ext_proc ProcessingRequest message is JSON encoded in the body of the HTTP message. + std::string body = processor_stream_->body().toString(); + ProcessingRequest request; + bool has_unknown_field; + auto status = MessageUtil::loadFromJsonNoThrow(body, request, has_unknown_field); + if (status.ok()) { + ProcessingResponse response; + auto* headers = response.mutable_request_headers(); + const bool sendReply = !cb || (*cb)(request.request_headers(), *headers); + if (sendReply) { + sendHttpResponse(response); + } + } else { + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "400"}}, true); + } + } + + void processResponseHeadersMessage( + FakeUpstream* side_stream, bool first_message, + absl::optional> cb) { + getAndCheckHttpRequest(side_stream, first_message); + + std::string body = processor_stream_->body().toString(); + ProcessingRequest request; + bool has_unknown_field; + auto status = MessageUtil::loadFromJsonNoThrow(body, request, has_unknown_field); + if (status.ok()) { + ProcessingResponse response; + auto* headers = response.mutable_response_headers(); + const bool sendReply = !cb || (*cb)(request.response_headers(), *headers); + if (sendReply) { + sendHttpResponse(response); + } + } else { + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "400"}}, true); + } + } + + void handleUpstreamRequest() { + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + } + + void handleUpstreamRequestWithTrailer() { + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, false); + upstream_request_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"x-test-trailers", "Yes"}}); + } + + void verifyDownstreamResponse(IntegrationStreamDecoder& response, int status_code) { + ASSERT_TRUE(response.waitForEndStream()); + EXPECT_TRUE(response.complete()); + EXPECT_EQ(std::to_string(status_code), response.headers().getStatusValue()); + } + + static std::vector getValuesForExtProcHttpTest() { + std::vector ret; + for (auto ip_version : TestEnvironment::getIpVersionsForTest()) { + for (auto downstream_protocol : {Http::CodecType::HTTP1, Http::CodecType::HTTP2}) { + for (auto upstream_protocol : {Http::CodecType::HTTP1, Http::CodecType::HTTP2}) { + ExtProcHttpTestParams params; + params.version = ip_version; + params.downstream_protocol = downstream_protocol; + params.upstream_protocol = upstream_protocol; + ret.push_back(params); + } + } + } + return ret; + } + + static std::string + ExtProcHttpTestParamsToString(const ::testing::TestParamInfo& params) { + return absl::StrCat( + (params.param.version == Network::Address::IpVersion::v4 ? "IPv4_" : "IPv6_"), + (params.param.downstream_protocol == Http::CodecType::HTTP1 ? "HTTP1_DS_" : "HTTP2_DS_"), + (params.param.upstream_protocol == Http::CodecType::HTTP1 ? "HTTP1_US" : "HTTP2_US")); + } + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config_{}; + std::vector http_side_upstreams_; + FakeHttpConnectionPtr processor_connection_; + FakeStreamPtr processor_stream_; + // Number of side stream servers in the test. + int side_stream_count_ = 2; +}; + +INSTANTIATE_TEST_SUITE_P( + Protocols, ExtProcHttpClientIntegrationTest, + testing::ValuesIn(ExtProcHttpClientIntegrationTest::getValuesForExtProcHttpTest()), + ExtProcHttpClientIntegrationTest::ExtProcHttpTestParamsToString); + +// Side stream server does not mutate the request header. +TEST_P(ExtProcHttpClientIntegrationTest, ServerNoRequestHeaderMutation) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("foo"), "yes"); }); + + // The side stream get the request and sends back the response. + processRequestHeadersMessage(http_side_upstreams_[0], true, absl::nullopt); + + // The request is sent to the upstream. + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("foo", "yes")); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +// Side stream server does not mutate the response header. +TEST_P(ExtProcHttpClientIntegrationTest, ServerNoResponseHeaderMutation) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("foo"), "yes"); }); + + // The request is sent to the upstream. + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("foo", "yes")); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + processResponseHeadersMessage(http_side_upstreams_[0], true, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +// Side stream server adds and removes headers from the header request. +TEST_P(ExtProcHttpClientIntegrationTest, GetAndSetHeadersWithMutation) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("x-remove-this"), "yes"); }); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders& headers, HeadersResponse& headers_resp) { + Http::TestRequestHeaderMapImpl expected_request_headers{ + {":scheme", "http"}, {":method", "GET"}, {"host", "host"}, + {":path", "/"}, {"x-remove-this", "yes"}, {"x-forwarded-proto", "http"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_request_headers)); + + auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation(); + auto* mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_raw_value("new"); + response_header_mutation->add_remove_headers("x-remove-this"); + return true; + }); + + // The request is sent to the upstream. + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_THAT(upstream_request_->headers(), HasNoHeader("x-remove-this")); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +// Side stream server does not send response trigger timeout. +TEST_P(ExtProcHttpClientIntegrationTest, ServerNoResponseFilterTimeout) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage(http_side_upstreams_[0], true, + [this](const HttpHeaders&, HeadersResponse&) { + // Travel forward 400 ms exceeding 200ms filter timeout. + timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(400)); + return false; + }); + // ext_proc filter timeouts sends a 504 local reply depending on runtime flag. + verifyDownstreamResponse(*response, 504); +} + +// Http timeout value set to 10ms. Test HTTP timeout. +TEST_P(ExtProcHttpClientIntegrationTest, ServerResponseHttpClientTimeout) { + ConfigOptions config_option = {}; + config_option.timeout = 10000000; + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage(http_side_upstreams_[0], true, + [this](const HttpHeaders&, HeadersResponse&) { + // Travel forward 50 ms exceeding 10ms HTTP URI timeout setting. + timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(50)); + return true; + }); + + // HTTP client timeouts sends a 500 local reply. + verifyDownstreamResponse(*response, 500); +} + +// Side stream server sends back 400 with fail-mode-allow set to false. +TEST_P(ExtProcHttpClientIntegrationTest, ServerSendsBackBadRequestFailClose) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse&) { return true; }, + true); + + verifyDownstreamResponse(*response, 500); +} + +// Side stream server sends back 400 with fail-mode-allow set to true. +TEST_P(ExtProcHttpClientIntegrationTest, ServerSendsBackBadRequestFailOpen) { + ConfigOptions config_option = {}; + config_option.failure_mode_allow = true; + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse&) { return true; }, + true); + + // The request is sent to the upstream. + handleUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +// Send headers in both directions. +TEST_P(ExtProcHttpClientIntegrationTest, SentHeadersInBothDirection) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequestWithBodyAndTrailer("foo"); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders& headers, HeadersResponse& headers_resp) { + Http::TestRequestHeaderMapImpl expected_request_headers{{":scheme", "http"}, + {":method", "GET"}, + {"host", "host"}, + {":path", "/"}, + {"x-forwarded-proto", "http"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_request_headers)); + + auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation(); + auto* mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_raw_value("new"); + return true; + }); + + // The request is sent to the upstream. + handleUpstreamRequestWithTrailer(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), "foo"); + + processResponseHeadersMessage(http_side_upstreams_[0], false, absl::nullopt); + verifyDownstreamResponse(*response, 200); +} + +// Wrong ext_proc filter cluster config with fail close. +TEST_P(ExtProcHttpClientIntegrationTest, WrongClusterConfigWithFailClose) { + ConfigOptions config_option = {}; + config_option.failure_mode_allow = false; + config_option.cluster = "foo"; + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + verifyDownstreamResponse(*response, 504); +} + +// Wrong ext_proc filter cluster config with fail open +TEST_P(ExtProcHttpClientIntegrationTest, WrongClusterConfigWithFailOpen) { + ConfigOptions config_option = {}; + config_option.failure_mode_allow = true; + config_option.cluster = "foo"; + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + // The request is sent to the upstream. + handleUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +} // namespace +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/http_client/http_client_test.cc b/test/extensions/filters/http/ext_proc/http_client/http_client_test.cc index 27a6d83c9ef1..faca0383df61 100644 --- a/test/extensions/filters/http/ext_proc/http_client/http_client_test.cc +++ b/test/extensions/filters/http/ext_proc/http_client/http_client_test.cc @@ -18,10 +18,26 @@ class ExtProcHttpClientTest : public testing::Test { public: ~ExtProcHttpClientTest() override = default; - void SetUp() override { client_ = std::make_unique(config_, context_); } + const std::string default_http_config_ = R"EOF( + http_service: + http_service: + http_uri: + uri: "ext_proc_server_0:9000" + cluster: "ext_proc_server_0" + timeout: + seconds: 500 + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SKIP" + )EOF"; + + void SetUp() override { + TestUtility::loadFromYaml(default_http_config_, config_); + client_ = std::make_unique(config_, context_); + } protected: - envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor config_; + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor config_{}; testing::NiceMock context_; Upstream::MockClusterManager& cm_{context_.cluster_manager_}; std::unique_ptr client_; @@ -31,30 +47,49 @@ class ExtProcHttpClientTest : public testing::Test { TEST_F(ExtProcHttpClientTest, Basic) { SetUp(); - client_->sendRequest(); + client_->cancel(); client_->context(); Tracing::MockSpan parent_span; client_->onBeforeFinalizeUpstreamSpan(parent_span, nullptr); Http::AsyncClient::FailureReason reason = Envoy::Http::AsyncClient::FailureReason::Reset; client_->onFailure(async_request_, reason); + reason = Envoy::Http::AsyncClient::FailureReason::ExceedResponseBufferLimit; + client_->onFailure(async_request_, reason); +} - Http::ResponseHeaderMapPtr resp_headers_ok(new Http::TestResponseHeaderMapImpl({ +TEST_F(ExtProcHttpClientTest, JsonDecodingErrorTest) { + Http::ResponseHeaderMapPtr resp_headers(new Http::TestResponseHeaderMapImpl({ {":status", "200"}, })); - Http::ResponseMessagePtr response_ok(new Http::ResponseMessageImpl(std::move(resp_headers_ok))); - client_->onSuccess(async_request_, std::move(response_ok)); + Http::ResponseMessagePtr response_ok_with_bad_body( + new Http::ResponseMessageImpl(std::move(resp_headers))); + response_ok_with_bad_body->body().add("foo-bar"); + client_->onSuccess(async_request_, std::move(response_ok_with_bad_body)); +} +TEST_F(ExtProcHttpClientTest, EmptyResponseBodyTest) { + Http::ResponseHeaderMapPtr resp_headers(new Http::TestResponseHeaderMapImpl({ + {":status", "200"}, + })); + Http::ResponseMessagePtr response_ok_with_empty_body( + new Http::ResponseMessageImpl(std::move(resp_headers))); + client_->onSuccess(async_request_, std::move(response_ok_with_empty_body)); +} + +TEST_F(ExtProcHttpClientTest, ResponseStatusNotOkTest) { Http::ResponseHeaderMapPtr resp_headers(new Http::TestResponseHeaderMapImpl({ {":status", "403"}, })); Http::ResponseMessagePtr response(new Http::ResponseMessageImpl(std::move(resp_headers))); client_->onSuccess(async_request_, std::move(response)); +} - Http::ResponseHeaderMapPtr resp_headers_foo(new Http::TestResponseHeaderMapImpl({ +TEST_F(ExtProcHttpClientTest, WrongResponseStatusTest) { + Http::ResponseHeaderMapPtr resp_headers(new Http::TestResponseHeaderMapImpl({ {":status", "foo"}, })); - Http::ResponseMessagePtr response_foo(new Http::ResponseMessageImpl(std::move(resp_headers_foo))); + Http::ResponseMessagePtr response_foo(new Http::ResponseMessageImpl(std::move(resp_headers))); client_->onSuccess(async_request_, std::move(response_foo)); } diff --git a/test/extensions/filters/http/ext_proc/mock_server.cc b/test/extensions/filters/http/ext_proc/mock_server.cc index 29637f793f83..35c91b238e3a 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.cc +++ b/test/extensions/filters/http/ext_proc/mock_server.cc @@ -5,12 +5,17 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -MockClient::MockClient() { - EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; })); +using ::testing::_; +using ::testing::Invoke; - EXPECT_CALL(*this, setStream(testing::_)) +MockClient::MockClient() { + EXPECT_CALL(*this, sendRequest(_, _, _, _, _)) .WillRepeatedly( - testing::Invoke([this](ExternalProcessorStream* stream) -> void { stream_ = stream; })); + Invoke([](envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream, + const uint64_t, RequestCallbacks*, StreamBase* stream) { + ExternalProcessorStream* grpc_stream = dynamic_cast(stream); + grpc_stream->send(std::move(request), end_stream); + })); } MockClient::~MockClient() = default; diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index 12c9d7308a93..a4962acb4e74 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -13,14 +13,15 @@ class MockClient : public ExternalProcessorClient { public: MockClient(); ~MockClient() override; + MOCK_METHOD(ExternalProcessorStreamPtr, start, (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); - MOCK_METHOD(ExternalProcessorStream*, stream, ()); - MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream)); - - ExternalProcessorStream* stream_ = nullptr; + MOCK_METHOD(void, sendRequest, + (envoy::service::ext_proc::v3::ProcessingRequest&&, bool, const uint64_t, + RequestCallbacks*, StreamBase*)); + MOCK_METHOD(void, cancel, ()); }; class MockStream : public ExternalProcessorStream { diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index 4b1e4cf1330f..a12be075b034 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -76,7 +76,7 @@ class OrderingTest : public testing::Test { std::make_shared( Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), factory_context_); - filter_ = std::make_unique(config_, std::move(client_), proto_config.grpc_service()); + filter_ = std::make_unique(config_, std::move(client_)); filter_->setEncoderFilterCallbacks(encoder_callbacks_); filter_->setDecoderFilterCallbacks(decoder_callbacks_); } diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index 25c6b3ac2d76..f8635e18598d 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -98,7 +98,7 @@ DEFINE_PROTO_FUZZER( ExternalProcessing::MockClient* client = new ExternalProcessing::MockClient(); std::unique_ptr filter = std::make_unique( - config, ExternalProcessing::ExternalProcessorClientPtr{client}, proto_config.grpc_service()); + config, ExternalProcessing::ExternalProcessorClientPtr{client}); filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); filter->setEncoderFilterCallbacks(mocks.encoder_callbacks_); diff --git a/test/extensions/filters/http/ext_proc/utils.h b/test/extensions/filters/http/ext_proc/utils.h index 858e8e7cb01d..351d40bd747e 100644 --- a/test/extensions/filters/http/ext_proc/utils.h +++ b/test/extensions/filters/http/ext_proc/utils.h @@ -31,6 +31,10 @@ MATCHER_P(HasNoHeader, key, absl::StrFormat("Headers have no value for \"%s\"", return arg.get(::Envoy::Http::LowerCaseString(std::string(key))).empty(); } +MATCHER_P(HasHeader, key, absl::StrFormat("There exists a header for \"%s\"", key)) { + return !arg.get(::Envoy::Http::LowerCaseString(std::string(key))).empty(); +} + MATCHER_P2(SingleHeaderValueIs, key, value, absl::StrFormat("Header \"%s\" equals \"%s\"", key, value)) { const auto hdr = arg.get(::Envoy::Http::LowerCaseString(std::string(key)));