Skip to content

Commit

Permalink
Ext_proc refactoring: Move stream object from Filter class to client (e…
Browse files Browse the repository at this point in the history
…nvoyproxy#36228)

This PR is part of the required refactoring needed to support HTTP
client in ext_proc: envoyproxy#35488

It is also to address a comment of
envoyproxy#35740 (comment)

---------

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Sep 25, 2024
1 parent 029e808 commit 41a3783
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 84 deletions.
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ExternalProcessorClient {
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE;
virtual ExternalProcessorStream* stream() PURE;
virtual void setStream(ExternalProcessorStream* stream) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override;
ExternalProcessorStream* stream() override { return stream_; }
void setStream(ExternalProcessorStream* stream) override { stream_ = stream; }

private:
Grpc::AsyncClientManager& client_manager_;
Stats::Scope& scope_;
// The gRPC stream to the external processor, which will be opened
// when it's time to send the first message.
ExternalProcessorStream* stream_ = nullptr;
};

class ExternalProcessorStreamImpl : public ExternalProcessorStream,
Expand Down
43 changes: 23 additions & 20 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ Filter::StreamOpenState Filter::openStream() {
ENVOY_LOG(debug, "External processing is completed when trying to open the gRPC stream");
return StreamOpenState::IgnoreError;
}
if (!stream_) {
if (!client_->stream()) {
ENVOY_LOG(debug, "Opening gRPC stream to external processor");

Http::AsyncClient::ParentContext grpc_context;
Expand All @@ -354,40 +354,42 @@ Filter::StreamOpenState Filter::openStream() {

if (processing_complete_) {
// Stream failed while starting and either onGrpcError or onGrpcClose was already called
// Asserts that `stream_` is nullptr since it is not valid to be used any further
// Asserts that `stream_object` is nullptr since it is not valid to be used any further
// beyond this point.
ASSERT(stream_object == nullptr);
return sent_immediate_response_ ? StreamOpenState::Error : StreamOpenState::IgnoreError;
}
stats_.streams_started_.inc();

stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(),
config_->deferredCloseTimeout());
ExternalProcessorStream* stream = config_->threadLocalStreamManager().store(
std::move(stream_object), config_->stats(), config_->deferredCloseTimeout());
client_->setStream(stream);
// For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not
// have a proper implementation of streamInfo.
if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) {
logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo());
logging_info_->setClusterInfo(client_->stream()->streamInfo().upstreamClusterInfo());
}
}
return StreamOpenState::Ok;
}

void Filter::closeStream() {
if (stream_) {
if (client_->stream()) {
ENVOY_LOG(debug, "Calling close on stream");
if (stream_->close()) {
if (client_->stream()->close()) {
stats_.streams_closed_.inc();
}
config_->threadLocalStreamManager().erase(stream_);
stream_ = nullptr;
config_->threadLocalStreamManager().erase(client_->stream());
client_->setStream(nullptr);
} else {
ENVOY_LOG(debug, "Stream already closed");
}
}

void Filter::deferredCloseStream() {
ENVOY_LOG(debug, "Calling deferred close on stream");
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher());
config_->threadLocalStreamManager().deferredErase(client_->stream(),
filter_callbacks_->dispatcher());
}

void Filter::onDestroy() {
Expand All @@ -405,8 +407,8 @@ void Filter::onDestroy() {
// closure is deferred upon filter destruction with a timer.

// First, release the referenced filter resource.
if (stream_ != nullptr) {
stream_->notifyFilterDestroy();
if (client_->stream() != nullptr) {
client_->stream()->notifyFilterDestroy();
}

// Second, perform stream deferred closure.
Expand Down Expand Up @@ -436,7 +438,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
state.setPaused(true);
return FilterHeadersStatus::StopIteration;
Expand Down Expand Up @@ -661,7 +663,7 @@ Filter::sendHeadersInObservabilityMode(Http::RequestOrResponseHeaderMap& headers
ProcessingRequest req =
buildHeaderRequest(state, headers, end_stream, /*observability_mode=*/true);
ENVOY_LOG(debug, "Sending headers message in observability mode");
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();

return FilterHeadersStatus::Continue;
Expand All @@ -686,7 +688,7 @@ Http::FilterDataStatus Filter::sendDataInObservabilityMode(Buffer::Instance& dat
// Set up the the body chunk and send.
auto req = setupBodyChunk(state, data, end_stream);
req.set_observability_mode(true);
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
ENVOY_LOG(debug, "Sending body message in ObservabilityMode");
} else if (state.bodyMode() != ProcessingMode::NONE) {
Expand Down Expand Up @@ -878,7 +880,7 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState
ProcessingRequest& req) {
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
new_state);
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
}

Expand All @@ -894,20 +896,21 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::TrailersCallback);
ENVOY_LOG(debug, "Sending trailers message");
stream_->send(std::move(req), false);
client_->stream()->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
}

void Filter::logGrpcStreamInfo() {
if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) {
const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter();
if (client_->stream() != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) {
const auto& upstream_meter = client_->stream()->streamInfo().getUpstreamBytesMeter();
if (upstream_meter != nullptr) {
logging_info_->setBytesSent(upstream_meter->wireBytesSent());
logging_info_->setBytesReceived(upstream_meter->wireBytesReceived());
}
// Only set upstream host in logging info once.
if (logging_info_->upstreamHost() == nullptr) {
logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost());
logging_info_->setUpstreamHost(
client_->stream()->streamInfo().upstreamInfo()->upstreamHost());
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,6 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
DecodingProcessorState decoding_state_;
EncodingProcessorState encoding_state_;

// The gRPC stream to the external processor, which will be opened
// when it's time to send the first message.
ExternalProcessorStream* stream_ = nullptr;

// Set to true when no more messages need to be sent to the processor.
// This happens when the processor has closed the stream, or when it has
// failed.
Expand Down
8 changes: 7 additions & 1 deletion test/extensions/filters/http/ext_proc/mock_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

MockClient::MockClient() = default;
MockClient::MockClient() {
EXPECT_CALL(*this, stream()).WillRepeatedly(testing::Invoke([this]() { return stream_; }));

EXPECT_CALL(*this, setStream(testing::_))
.WillRepeatedly(
testing::Invoke([this](ExternalProcessorStream* stream) -> void { stream_ = stream; }));
}
MockClient::~MockClient() = default;

MockStream::MockStream() = default;
Expand Down
4 changes: 4 additions & 0 deletions test/extensions/filters/http/ext_proc/mock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class MockClient : public ExternalProcessorClient {
(ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&,
const Envoy::Http::AsyncClient::StreamOptions&,
Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&));
MOCK_METHOD(ExternalProcessorStream*, stream, ());
MOCK_METHOD(void, setStream, (ExternalProcessorStream * stream));

ExternalProcessorStream* stream_ = nullptr;
};

class MockStream : public ExternalProcessorStream {
Expand Down
13 changes: 1 addition & 12 deletions test/extensions/filters/http/ext_proc/unit_test_fuzz/BUILD
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_fuzz_test",
"envoy_cc_mock",
"envoy_package",
"envoy_proto_library",
)
Expand All @@ -10,16 +9,6 @@ licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_mock(
name = "ext_proc_mocks",
hdrs = ["mocks.h"],
tags = ["skip_on_windows"],
deps = [
"//source/extensions/filters/http/ext_proc:client_interface",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
)

envoy_proto_library(
name = "ext_proc_unit_test_fuzz_proto",
srcs = ["ext_proc_unit_test_fuzz.proto"],
Expand All @@ -37,10 +26,10 @@ envoy_cc_fuzz_test(
rbe_pool = "2core",
tags = ["skip_on_windows"],
deps = [
":ext_proc_mocks",
":ext_proc_unit_test_fuzz_proto_cc_proto",
"//source/extensions/filters/http/ext_proc:config",
"//test/extensions/filters/http/common/fuzz:http_filter_fuzzer_lib",
"//test/extensions/filters/http/ext_proc:mock_server_lib",
"//test/mocks/http:http_mocks",
"//test/mocks/network:network_mocks",
"//test/mocks/server:server_factory_context_mocks",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#include "source/extensions/filters/http/ext_proc/ext_proc.h"

#include "test/extensions/filters/http/common/fuzz/http_filter_fuzzer.h"
#include "test/extensions/filters/http/ext_proc/mock_server.h"
#include "test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.pb.validate.h"
#include "test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h"
#include "test/fuzz/fuzz_runner.h"
#include "test/mocks/http/mocks.h"
#include "test/mocks/network/mocks.h"
Expand Down Expand Up @@ -69,6 +69,14 @@ DEFINE_PROTO_FUZZER(
return;
}

// Limiting the max supported request body size to 128k.
if (input.request().has_proto_body()) {
const uint32_t max_body_size = 128 * 1024;
if (input.request().proto_body().message().value().size() > max_body_size) {
return;
}
}

static FuzzerMocks mocks;
NiceMock<Stats::MockIsolatedStatsStore> stats_store;

Expand All @@ -88,7 +96,7 @@ DEFINE_PROTO_FUZZER(
return;
}

MockClient* client = new MockClient();
ExternalProcessing::MockClient* client = new ExternalProcessing::MockClient();
std::unique_ptr<ExternalProcessing::Filter> filter = std::make_unique<ExternalProcessing::Filter>(
config, ExternalProcessing::ExternalProcessorClientPtr{client}, proto_config.grpc_service());
filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_);
Expand All @@ -100,7 +108,7 @@ DEFINE_PROTO_FUZZER(
const Envoy::Http::AsyncClient::StreamOptions&,
Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&)
-> ExternalProcessing::ExternalProcessorStreamPtr {
auto stream = std::make_unique<MockStream>();
auto stream = std::make_unique<ExternalProcessing::MockStream>();
EXPECT_CALL(*stream, send(_, _))
.WillRepeatedly(Invoke([&](envoy::service::ext_proc::v3::ProcessingRequest&&,
bool) -> void {
Expand Down
44 changes: 0 additions & 44 deletions test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h

This file was deleted.

0 comments on commit 41a3783

Please sign in to comment.