Skip to content

Commit

Permalink
filters: install network filters on upstream connections envoyproxy#173
Browse files Browse the repository at this point in the history
Allow network filters to be installed via cluster configuration,
uses the same configuration syntax as listener filters.

Added the following:
- Cluster filter config: api/envoy/api/v2/cluster/filter.proto
- Test to verify filters are applied: test/common/upstream/cluster_manager_impl_test.cc
- Create filter factory and apply to new upstream connnections
- Implement Server::Configuration::FactoryContext using contexts from TransportSocketFactoryContext

NEEDS MINOR WORK: Some FactoryContext functions throw NOT_IMPLEMENTED, because
some context objects are not obviously available to the upstream code.  Needs
attention from someone better versed in the architecture, see TODO(aconway)

This does not prevent use of the feature, most contexts are available.

This feature is used by project: https://github.com/alanconway/envoy-amqp.

Signed-off-by: Alan Conway <[email protected]>
  • Loading branch information
alanconway committed Mar 5, 2019
1 parent 741df7a commit ff675d2
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 10 deletions.
1 change: 1 addition & 0 deletions api/envoy/api/v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ api_proto_library_internal(
":eds",
"//envoy/api/v2/auth:cert",
"//envoy/api/v2/cluster:circuit_breaker",
"//envoy/api/v2/cluster:filter",
"//envoy/api/v2/cluster:outlier_detection",
"//envoy/api/v2/core:address",
"//envoy/api/v2/core:base",
Expand Down
6 changes: 6 additions & 0 deletions api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import "envoy/api/v2/discovery.proto";
import "envoy/api/v2/core/health_check.proto";
import "envoy/api/v2/core/protocol.proto";
import "envoy/api/v2/cluster/circuit_breaker.proto";
import "envoy/api/v2/cluster/filter.proto";
import "envoy/api/v2/cluster/outlier_detection.proto";
import "envoy/api/v2/eds.proto";
import "envoy/type/percent.proto";
Expand Down Expand Up @@ -574,6 +575,11 @@ message Cluster {
// If this flag is not set to true, Envoy will wait until the hosts fail active health
// checking before removing it from the cluster.
bool drain_connections_on_host_removal = 32;

// An optional list of network filters that make up the filter chain for
// outgoing connections made by the cluster. Order matters as the filters are
// processed sequentially as connection events happen.
repeated cluster.Filter filters = 38 [(gogoproto.nullable) = false];
}

// An extensible structure containing the address Envoy should bind to when
Expand Down
23 changes: 23 additions & 0 deletions api/envoy/api/v2/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,26 @@ api_go_proto_library(
name = "outlier_detection",
proto = ":outlier_detection",
)

api_proto_library_internal(
name = "filter",
srcs = ["filter.proto"],
visibility = [
"//envoy/api/v2:__pkg__",
],
deps = [
"//envoy/api/v2/auth:cert",
"//envoy/api/v2/core:address",
"//envoy/api/v2/core:base",
],
)

api_go_proto_library(
name = "filter",
proto = ":filter",
deps = [
"//envoy/api/v2/auth:cert_go_proto",
"//envoy/api/v2/core:address_go_proto",
"//envoy/api/v2/core:base_go_proto",
],
)
34 changes: 34 additions & 0 deletions api/envoy/api/v2/cluster/filter.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
syntax = "proto3";

package envoy.api.v2.cluster;

option java_outer_classname = "FilterProto";
option java_multiple_files = true;
option java_package = "io.envoyproxy.envoy.api.v2.cluster";
option go_package = "cluster";
option csharp_namespace = "Envoy.Api.V2.ClusterNS";

import "envoy/api/v2/core/address.proto";
import "envoy/api/v2/auth/cert.proto";
import "envoy/api/v2/core/base.proto";

import "google/protobuf/any.proto";
import "google/protobuf/struct.proto";

import "validate/validate.proto";
import "gogoproto/gogo.proto";

option (gogoproto.equal_all) = true;

// [#protodoc-title: Upstream filters]
message Filter {
// The name of the upstream filter to instantiate. The name must match a supported filter.
string name = 1 [(validate.rules).string.min_bytes = 1];

// Filter specific configuration which depends on the filter being
// instantiated. See the supported filters for further documentation.
oneof config_type {
google.protobuf.Struct config = 2 [deprecated = true];
google.protobuf.Any typed_config = 3;
}
}
5 changes: 5 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,11 @@ class ClusterInfo {
*/
virtual absl::optional<std::string> eds_service_name() const PURE;

/**
* Create network filters on a new upstream connection.
*/
virtual void createNetworkFilterChain(Network::Connection& connection) const PURE;

protected:
/**
* Invoked by extensionProtocolOptionsTyped.
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ envoy_cc_library(
"//include/envoy/local_info:local_info_interface",
"//include/envoy/network:dns_interface",
"//include/envoy/runtime:runtime_interface",
"//include/envoy/server:filter_config_interface",
"//include/envoy/server:transport_socket_config_interface",
"//include/envoy/ssl:context_manager_interface",
"//include/envoy/thread_local:thread_local_interface",
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/health_discovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ProdClusterInfoFactory::createClusterInfo(const CreateClusterInfoParams& params)

return std::make_unique<ClusterInfoImpl>(params.cluster_, params.bind_config_, params.runtime_,
std::move(socket_factory), std::move(scope),
params.added_via_api_);
params.added_via_api_, factory_context);
}

void HdsCluster::startHealthchecks(AccessLog::AccessLogManager& access_log_manager,
Expand Down
98 changes: 91 additions & 7 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& clu
cluster.transportSocketFactory().createTransportSocket(transport_socket_options),
connection_options);
connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
cluster.createNetworkFilterChain(*connection);
return connection;
}

Expand Down Expand Up @@ -509,11 +510,63 @@ ClusterLoadReportStats ClusterInfoImpl::generateLoadReportStats(Stats::Scope& sc
return {ALL_CLUSTER_LOAD_REPORT_STATS(POOL_COUNTER(scope))};
}

ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config,
const envoy::api::v2::core::BindConfig& bind_config,
Runtime::Loader& runtime,
Network::TransportSocketFactoryPtr&& socket_factory,
Stats::ScopePtr&& stats_scope, bool added_via_api)
// Implements the FactoryContext interface required by network filters.
class ClusterInfoImpl::FactoryContextImpl : public Server::Configuration::FactoryContext {
public:
// Create from a TransportSocketFactoryContext using parent stats_scope and runtime
// other contexts taken from TransportSocketFactoryContext.
FactoryContextImpl(Stats::Scope& stats_scope, Envoy::Runtime::Loader& runtime,
Server::Configuration::TransportSocketFactoryContext& c)
: admin_(c.admin()), stats_scope_(stats_scope), cluster_manager_(c.clusterManager()),
local_info_(c.localInfo()), dispatcher_(c.dispatcher()), random_(c.random()),
runtime_(runtime), singleton_manager_(c.singletonManager()), tls_(c.threadLocal()),
init_manager_(c.initManager()), api_(c.api()) {}

// TODO(aconway) some contexts are not obviously available in the upstream
// code, and will throw NOT_IMPLEMENTED. These should be implemented by
// someone with a better understanding of the lifecycle and role of each context.
AccessLog::AccessLogManager& accessLogManager() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Upstream::ClusterManager& clusterManager() override { return cluster_manager_; }
Event::Dispatcher& dispatcher() override { return dispatcher_; }
const Network::DrainDecision& drainDecision() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
bool healthCheckFailed() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Tracing::HttpTracer& httpTracer() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Init::Manager& initManager() override { return *init_manager_; }
const LocalInfo::LocalInfo& localInfo() const override { return local_info_; }
Envoy::Runtime::RandomGenerator& random() override { return random_; }
virtual Envoy::Runtime::Loader& runtime() override { return runtime_; }
Stats::Scope& scope() override { return stats_scope_; }
Singleton::Manager& singletonManager() override { return singleton_manager_; }
ThreadLocal::SlotAllocator& threadLocal() override { return tls_; }
Server::Admin& admin() override { return admin_; }
Stats::Scope& listenerScope() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
const envoy::api::v2::core::Metadata& listenerMetadata() const override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
TimeSource& timeSource() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Server::OverloadManager& overloadManager() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Http::Context& httpContext() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Api::Api& api() override { return api_; }

private:
Server::Admin& admin_;
Stats::Scope& stats_scope_;
Upstream::ClusterManager& cluster_manager_;
const LocalInfo::LocalInfo& local_info_;
Event::Dispatcher& dispatcher_;
Envoy::Runtime::RandomGenerator& random_;
Envoy::Runtime::Loader& runtime_;
Singleton::Manager& singleton_manager_;
ThreadLocal::SlotAllocator& tls_;
Init::Manager* init_manager_{};
Api::Api& api_;
};

ClusterInfoImpl::ClusterInfoImpl(
const envoy::api::v2::Cluster& config, const envoy::api::v2::core::BindConfig& bind_config,
Runtime::Loader& runtime, Network::TransportSocketFactoryPtr&& socket_factory,
Stats::ScopePtr&& stats_scope, bool added_via_api,
Server::Configuration::TransportSocketFactoryContext& factory_context)
: runtime_(runtime), name_(config.name()), type_(config.type()),
max_requests_per_connection_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_requests_per_connection, 0)),
Expand All @@ -537,7 +590,9 @@ ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config,
metadata_(config.metadata()), typed_metadata_(config.metadata()),
common_lb_config_(config.common_lb_config()),
cluster_socket_options_(parseClusterSocketOptions(config, bind_config)),
drain_connections_on_host_removal_(config.drain_connections_on_host_removal()) {
drain_connections_on_host_removal_(config.drain_connections_on_host_removal()),
factory_context_(
std::make_unique<FactoryContextImpl>(*stats_scope_, runtime, factory_context)) {
switch (config.lb_policy()) {
case envoy::api::v2::Cluster::ROUND_ROBIN:
lb_type_ = LoadBalancerType::RoundRobin;
Expand Down Expand Up @@ -595,6 +650,29 @@ ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config,
// https://github.com/lyft/protoc-gen-validate/issues/97 resolved. This just provides early
// validation of sanity of fields that we should catch at config ingestion.
DurationUtil::durationToMilliseconds(common_lb_config_.update_merge_window());

// Create upstream filter factories
auto filters = config.filters();
for (ssize_t i = 0; i < filters.size(); i++) {
const auto& proto_config = filters[i];
const ProtobufTypes::String name = proto_config.name();
const Json::ObjectSharedPtr filter_config =
MessageUtil::getJsonObjectFromMessage(proto_config.config());
ENVOY_LOG(debug, "filter #{} name: {} config: {}", i, name, filter_config->asJsonString());
// Now see if there is a factory that will accept the config.
auto& factory =
Config::Utility::getAndCheckFactory<Server::Configuration::NamedNetworkFilterConfigFactory>(
name);
Network::FilterFactoryCb callback;
if (filter_config->getBoolean("deprecated_v1", false)) {
callback =
factory.createFilterFactory(*filter_config->getObject("value", true), *factory_context_);
} else {
auto message = Config::Utility::translateToFactoryConfig(proto_config, factory);
callback = factory.createFilterFactoryFromProto(*message, *factory_context_);
}
filter_factories_.push_back(callback);
}
}

ProtocolOptionsConfigConstSharedPtr
Expand Down Expand Up @@ -640,6 +718,12 @@ Network::TransportSocketFactoryPtr createTransportSocketFactory(
return config_factory.createTransportSocketFactory(*message, factory_context);
}

void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const {
for (const auto& factory : filter_factories_) {
factory(connection);
}
}

ClusterSharedPtr ClusterImplBase::create(
const envoy::api::v2::Cluster& cluster, ClusterManager& cm, Stats::Store& stats,
ThreadLocal::Instance& tls, Network::DnsResolverSharedPtr dns_resolver,
Expand Down Expand Up @@ -736,7 +820,7 @@ ClusterImplBase::ClusterImplBase(
auto socket_factory = createTransportSocketFactory(cluster, factory_context);
info_ = std::make_unique<ClusterInfoImpl>(cluster, factory_context.clusterManager().bindConfig(),
runtime, std::move(socket_factory),
std::move(stats_scope), added_via_api);
std::move(stats_scope), added_via_api, factory_context);
// Create the default (empty) priority set before registering callbacks to
// avoid getting an update the first time it is accessed.
priority_set_.getOrCreateHostSet(0);
Expand Down
13 changes: 11 additions & 2 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include "envoy/event/timer.h"
#include "envoy/local_info/local_info.h"
#include "envoy/network/dns.h"
#include "envoy/network/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/secret/secret_manager.h"
#include "envoy/server/filter_config.h"
#include "envoy/server/transport_socket_config.h"
#include "envoy/ssl/context_manager.h"
#include "envoy/stats/scope.h"
Expand Down Expand Up @@ -472,12 +474,13 @@ class PrioritySetImpl : public PrioritySet {
/**
* Implementation of ClusterInfo that reads from JSON.
*/
class ClusterInfoImpl : public ClusterInfo {
class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable<Logger::Id::upstream> {
public:
ClusterInfoImpl(const envoy::api::v2::Cluster& config,
const envoy::api::v2::core::BindConfig& bind_config, Runtime::Loader& runtime,
Network::TransportSocketFactoryPtr&& socket_factory,
Stats::ScopePtr&& stats_scope, bool added_via_api);
Stats::ScopePtr&& stats_scope, bool added_via_api,
Server::Configuration::TransportSocketFactoryContext&);

static ClusterStats generateStats(Stats::Scope& scope);
static ClusterLoadReportStats generateLoadReportStats(Stats::Scope& scope);
Expand Down Expand Up @@ -539,6 +542,8 @@ class ClusterInfoImpl : public ClusterInfo {

absl::optional<std::string> eds_service_name() const override { return eds_service_name_; }

void createNetworkFilterChain(Network::Connection&) const;

private:
struct ResourceManagers {
ResourceManagers(const envoy::api::v2::Cluster& config, Runtime::Loader& runtime,
Expand Down Expand Up @@ -582,6 +587,10 @@ class ClusterInfoImpl : public ClusterInfo {
const Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options_;
const bool drain_connections_on_host_removal_;
absl::optional<std::string> eds_service_name_;

class FactoryContextImpl;
std::unique_ptr<Server::Configuration::FactoryContext> factory_context_;
std::vector<Network::FilterFactoryCb> filter_factories_;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions test/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ envoy_cc_test(
"//source/common/network:utility_lib",
"//source/common/stats:stats_lib",
"//source/common/upstream:cluster_manager_lib",
"//source/extensions/filters/network/echo",
"//source/extensions/filters/network/echo:config",
"//source/extensions/transport_sockets/raw_buffer:config",
"//source/extensions/transport_sockets/tls:context_lib",
"//test/mocks/access_log:access_log_mocks",
Expand Down
29 changes: 29 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2418,6 +2418,35 @@ TEST_F(ClusterManagerImplTest, MergedUpdatesDestroyedOnUpdate) {
EXPECT_EQ(0, factory_.stats_.gauge("cluster_manager.warming_clusters").value());
}

// Verify that configured upstream filters are added to client connections.
TEST_F(ClusterManagerImplTest, AddUpstreamFilters) {
const std::string yaml = R"EOF(
static_resources:
clusters:
- name: cluster_1
connect_timeout: 0.250s
lb_policy: ROUND_ROBIN
type: STATIC
hosts:
- socket_address:
address: "127.0.0.1"
port_value: 11001
filters:
- name: envoy.echo
)EOF";

create(parseBootstrapFromV2Yaml(yaml));
Network::MockClientConnection* connection = new NiceMock<Network::MockClientConnection>();
EXPECT_CALL(*connection, addReadFilter(_)).Times(1); // echo is a read filter.
EXPECT_CALL(*connection, addWriteFilter(_)).Times(0);
EXPECT_CALL(*connection, addFilter(_)).Times(0);
EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_, _, _, _))
.WillOnce(Return(connection));
auto conn_data = cluster_manager_->tcpConnForCluster("cluster_1", nullptr, nullptr);
EXPECT_EQ(connection, conn_data.connection_.get());
factory_.tls_.shutdownThread();
}

class ClusterManagerInitHelperTest : public testing::Test {
public:
MOCK_METHOD1(onClusterInit, void(Cluster& cluster));
Expand Down
1 change: 1 addition & 0 deletions test/mocks/upstream/cluster_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class MockClusterInfo : public ClusterInfo {
MOCK_CONST_METHOD0(clusterSocketOptions, const Network::ConnectionSocket::OptionsSharedPtr&());
MOCK_CONST_METHOD0(drainConnectionsOnHostRemoval, bool());
MOCK_CONST_METHOD0(eds_service_name, absl::optional<std::string>());
MOCK_CONST_METHOD1(createNetworkFilterChain, void(Network::Connection&));

std::string name_{"fake_cluster"};
absl::optional<std::string> eds_service_name_;
Expand Down

0 comments on commit ff675d2

Please sign in to comment.