From 41a378353dd465d9ed4963c2094d5db6b7c5b650 Mon Sep 17 00:00:00 2001 From: yanjunxiang-google <78807980+yanjunxiang-google@users.noreply.github.com> Date: Wed, 25 Sep 2024 19:28:36 -0400 Subject: [PATCH] Ext_proc refactoring: Move stream object from Filter class to client (#36228) This PR is part of the required refactoring needed to support HTTP client in ext_proc: https://github.com/envoyproxy/envoy/issues/35488 It is also to address a comment of https://github.com/envoyproxy/envoy/pull/35740#discussion_r1765336235 --------- Signed-off-by: Yanjun Xiang --- .../extensions/filters/http/ext_proc/client.h | 2 + .../filters/http/ext_proc/client_impl.h | 5 +++ .../filters/http/ext_proc/ext_proc.cc | 43 +++++++++--------- .../filters/http/ext_proc/ext_proc.h | 4 -- .../filters/http/ext_proc/mock_server.cc | 8 +++- .../filters/http/ext_proc/mock_server.h | 4 ++ .../http/ext_proc/unit_test_fuzz/BUILD | 13 +----- .../unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 14 ++++-- .../http/ext_proc/unit_test_fuzz/mocks.h | 44 ------------------- 9 files changed, 53 insertions(+), 84 deletions(-) delete mode 100644 test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 54493c094fe3..413bbcac7730 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -48,6 +48,8 @@ class ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE; + virtual ExternalProcessorStream* stream() PURE; + virtual void setStream(ExternalProcessorStream* stream) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 745bd3f167c8..8ef177cda00c 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -32,10 +32,15 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const Http::AsyncClient::StreamOptions& options, Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override; + ExternalProcessorStream* stream() override { return stream_; } + void setStream(ExternalProcessorStream* stream) override { stream_ = stream; } private: Grpc::AsyncClientManager& client_manager_; Stats::Scope& scope_; + // The gRPC stream to the external processor, which will be opened + // when it's time to send the first message. + ExternalProcessorStream* stream_ = nullptr; }; class ExternalProcessorStreamImpl : public ExternalProcessorStream, diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 83dde06362e2..9175293ca18f 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -339,7 +339,7 @@ Filter::StreamOpenState Filter::openStream() { ENVOY_LOG(debug, "External processing is completed when trying to open the gRPC stream"); return StreamOpenState::IgnoreError; } - if (!stream_) { + if (!client_->stream()) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); Http::AsyncClient::ParentContext grpc_context; @@ -354,32 +354,33 @@ Filter::StreamOpenState Filter::openStream() { if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called - // Asserts that `stream_` is nullptr since it is not valid to be used any further + // Asserts that `stream_object` is nullptr since it is not valid to be used any further // beyond this point. ASSERT(stream_object == nullptr); return sent_immediate_response_ ? StreamOpenState::Error : StreamOpenState::IgnoreError; } stats_.streams_started_.inc(); - stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(), - config_->deferredCloseTimeout()); + ExternalProcessorStream* stream = config_->threadLocalStreamManager().store( + std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); + client_->setStream(stream); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not // have a proper implementation of streamInfo. if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) { - logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo()); + logging_info_->setClusterInfo(client_->stream()->streamInfo().upstreamClusterInfo()); } } return StreamOpenState::Ok; } void Filter::closeStream() { - if (stream_) { + if (client_->stream()) { ENVOY_LOG(debug, "Calling close on stream"); - if (stream_->close()) { + if (client_->stream()->close()) { stats_.streams_closed_.inc(); } - config_->threadLocalStreamManager().erase(stream_); - stream_ = nullptr; + config_->threadLocalStreamManager().erase(client_->stream()); + client_->setStream(nullptr); } else { ENVOY_LOG(debug, "Stream already closed"); } @@ -387,7 +388,8 @@ void Filter::closeStream() { void Filter::deferredCloseStream() { ENVOY_LOG(debug, "Calling deferred close on stream"); - config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); + config_->threadLocalStreamManager().deferredErase(client_->stream(), + filter_callbacks_->dispatcher()); } void Filter::onDestroy() { @@ -405,8 +407,8 @@ void Filter::onDestroy() { // closure is deferred upon filter destruction with a timer. // First, release the referenced filter resource. - if (stream_ != nullptr) { - stream_->notifyFilterDestroy(); + if (client_->stream() != nullptr) { + client_->stream()->notifyFilterDestroy(); } // Second, perform stream deferred closure. @@ -436,7 +438,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); state.setPaused(true); return FilterHeadersStatus::StopIteration; @@ -661,7 +663,7 @@ Filter::sendHeadersInObservabilityMode(Http::RequestOrResponseHeaderMap& headers ProcessingRequest req = buildHeaderRequest(state, headers, end_stream, /*observability_mode=*/true); ENVOY_LOG(debug, "Sending headers message in observability mode"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); return FilterHeadersStatus::Continue; @@ -686,7 +688,7 @@ Http::FilterDataStatus Filter::sendDataInObservabilityMode(Buffer::Instance& dat // Set up the the body chunk and send. auto req = setupBodyChunk(state, data, end_stream); req.set_observability_mode(true); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); ENVOY_LOG(debug, "Sending body message in ObservabilityMode"); } else if (state.bodyMode() != ProcessingMode::NONE) { @@ -878,7 +880,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState ProcessingRequest& req) { state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), new_state); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); } @@ -894,20 +896,21 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::TrailersCallback); ENVOY_LOG(debug, "Sending trailers message"); - stream_->send(std::move(req), false); + client_->stream()->send(std::move(req), false); stats_.stream_msgs_sent_.inc(); } void Filter::logGrpcStreamInfo() { - if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { - const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter(); + if (client_->stream() != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { + const auto& upstream_meter = client_->stream()->streamInfo().getUpstreamBytesMeter(); if (upstream_meter != nullptr) { logging_info_->setBytesSent(upstream_meter->wireBytesSent()); logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); } // Only set upstream host in logging info once. if (logging_info_->upstreamHost() == nullptr) { - logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); + logging_info_->setUpstreamHost( + client_->stream()->streamInfo().upstreamInfo()->upstreamHost()); } } } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index a908a1f6f775..3f52bba5094b 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -486,10 +486,6 @@ class Filter : public Logger::Loggable, DecodingProcessorState decoding_state_; EncodingProcessorState encoding_state_; - // The gRPC stream to the external processor, which will be opened - // when it's time to send the first message. - ExternalProcessorStream* stream_ = nullptr; - // Set to true when no more messages need to be sent to the processor. // This happens when the processor has closed the stream, or when it has // failed. diff --git a/test/extensions/filters/http/ext_proc/mock_server.cc b/test/extensions/filters/http/ext_proc/mock_server.cc index 25286be792c3..29637f793f83 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.cc +++ b/test/extensions/filters/http/ext_proc/mock_server.cc @@ -5,7 +5,13 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { -MockClient::MockClient() = default; +MockClient::MockClient() { + EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; })); + + EXPECT_CALL(*this, setStream(testing::_)) + .WillRepeatedly( + testing::Invoke([this](ExternalProcessorStream* stream) -> void { stream_ = stream; })); +} MockClient::~MockClient() = default; MockStream::MockStream() = default; diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index d0b0389b0fd4..12c9d7308a93 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -17,6 +17,10 @@ class MockClient : public ExternalProcessorClient { (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); + MOCK_METHOD(ExternalProcessorStream*, stream, ()); + MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream)); + + ExternalProcessorStream* stream_ = nullptr; }; class MockStream : public ExternalProcessorStream { diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD index f42d0981a65a..66a015876a44 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD @@ -1,7 +1,6 @@ load( "//bazel:envoy_build_system.bzl", "envoy_cc_fuzz_test", - "envoy_cc_mock", "envoy_package", "envoy_proto_library", ) @@ -10,16 +9,6 @@ licenses(["notice"]) # Apache 2 envoy_package() -envoy_cc_mock( - name = "ext_proc_mocks", - hdrs = ["mocks.h"], - tags = ["skip_on_windows"], - deps = [ - "//source/extensions/filters/http/ext_proc:client_interface", - "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", - ], -) - envoy_proto_library( name = "ext_proc_unit_test_fuzz_proto", srcs = ["ext_proc_unit_test_fuzz.proto"], @@ -37,10 +26,10 @@ envoy_cc_fuzz_test( rbe_pool = "2core", tags = ["skip_on_windows"], deps = [ - ":ext_proc_mocks", ":ext_proc_unit_test_fuzz_proto_cc_proto", "//source/extensions/filters/http/ext_proc:config", "//test/extensions/filters/http/common/fuzz:http_filter_fuzzer_lib", + "//test/extensions/filters/http/ext_proc:mock_server_lib", "//test/mocks/http:http_mocks", "//test/mocks/network:network_mocks", "//test/mocks/server:server_factory_context_mocks", diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index df321fe20487..25c6b3ac2d76 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -1,8 +1,8 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" #include "test/extensions/filters/http/common/fuzz/http_filter_fuzzer.h" +#include "test/extensions/filters/http/ext_proc/mock_server.h" #include "test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.pb.validate.h" -#include "test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h" #include "test/fuzz/fuzz_runner.h" #include "test/mocks/http/mocks.h" #include "test/mocks/network/mocks.h" @@ -69,6 +69,14 @@ DEFINE_PROTO_FUZZER( return; } + // Limiting the max supported request body size to 128k. + if (input.request().has_proto_body()) { + const uint32_t max_body_size = 128 * 1024; + if (input.request().proto_body().message().value().size() > max_body_size) { + return; + } + } + static FuzzerMocks mocks; NiceMock stats_store; @@ -88,7 +96,7 @@ DEFINE_PROTO_FUZZER( return; } - MockClient* client = new MockClient(); + ExternalProcessing::MockClient* client = new ExternalProcessing::MockClient(); std::unique_ptr filter = std::make_unique( config, ExternalProcessing::ExternalProcessorClientPtr{client}, proto_config.grpc_service()); filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); @@ -100,7 +108,7 @@ DEFINE_PROTO_FUZZER( const Envoy::Http::AsyncClient::StreamOptions&, Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&) -> ExternalProcessing::ExternalProcessorStreamPtr { - auto stream = std::make_unique(); + auto stream = std::make_unique(); EXPECT_CALL(*stream, send(_, _)) .WillRepeatedly(Invoke([&](envoy::service::ext_proc::v3::ProcessingRequest&&, bool) -> void { diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h deleted file mode 100644 index 49ff067dd353..000000000000 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include "envoy/service/ext_proc/v3/external_processor.pb.h" - -#include "source/extensions/filters/http/ext_proc/client.h" - -#include "gmock/gmock.h" - -namespace Envoy { -namespace Extensions { -namespace HttpFilters { -namespace ExtProc { -namespace UnitTestFuzz { - -class MockStream : public ExternalProcessing::ExternalProcessorStream { -public: - MockStream() = default; - ~MockStream() override = default; - - MOCK_METHOD(void, send, - (envoy::service::ext_proc::v3::ProcessingRequest && request, bool end_stream)); - MOCK_METHOD(bool, close, ()); - MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override)); - MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ()); - MOCK_METHOD(void, notifyFilterDestroy, ()); -}; - -class MockClient : public ExternalProcessing::ExternalProcessorClient { -public: - MockClient() = default; - ~MockClient() override = default; - - MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start, - (ExternalProcessing::ExternalProcessorCallbacks & callbacks, - const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - const Envoy::Http::AsyncClient::StreamOptions&, - Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)); -}; - -} // namespace UnitTestFuzz -} // namespace ExtProc -} // namespace HttpFilters -} // namespace Extensions -} // namespace Envoy