diff --git a/api/envoy/extensions/filters/network/bumping/v3/BUILD b/api/envoy/extensions/filters/network/bumping/v3/BUILD new file mode 100644 index 000000000000..0b02b988e42f --- /dev/null +++ b/api/envoy/extensions/filters/network/bumping/v3/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/accesslog/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/api/envoy/extensions/filters/network/bumping/v3/bumping.proto b/api/envoy/extensions/filters/network/bumping/v3/bumping.proto new file mode 100644 index 000000000000..e255331a6468 --- /dev/null +++ b/api/envoy/extensions/filters/network/bumping/v3/bumping.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +package envoy.extensions.filters.network.bumping.v3; + +import "envoy/config/accesslog/v3/accesslog.proto"; + +import "google/protobuf/wrappers.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.network.bumping.v3"; +option java_outer_classname = "BumpingProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/bumping/v3;bumpingv3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Bumping] +// Bumping :ref:`configuration overview `. +// [#extension: envoy.filters.network.bumping] + +message Bumping { + // The prefix to use when emitting :ref:`statistics + // `. + string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; + + // The upstream cluster to connect to. + string cluster = 2; + + // Configuration for :ref:`access logs ` + // emitted by the this bumping filter. + repeated config.accesslog.v3.AccessLog access_log = 3; + + // The maximum number of unsuccessful connection attempts that will be made before + // giving up. If the parameter is not specified, 1 connection attempt will be made. + google.protobuf.UInt32Value max_connect_attempts = 4 [(validate.rules).uint32 = {gte: 1}]; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index 311420f088ce..7fb0a4b6f524 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -135,6 +135,7 @@ proto_library( "//envoy/extensions/filters/listener/original_src/v3:pkg", "//envoy/extensions/filters/listener/proxy_protocol/v3:pkg", "//envoy/extensions/filters/listener/tls_inspector/v3:pkg", + "//envoy/extensions/filters/network/bumping/v3:pkg", "//envoy/extensions/filters/network/connection_limit/v3:pkg", "//envoy/extensions/filters/network/direct_response/v3:pkg", "//envoy/extensions/filters/network/dubbo_proxy/router/v3:pkg", diff --git a/envoy/network/connection.h b/envoy/network/connection.h index 3cbada199d25..ec5738bbf7a3 100644 --- a/envoy/network/connection.h +++ b/envoy/network/connection.h @@ -351,6 +351,8 @@ class Connection : public Event::DeferredDeletable, * return value is cwnd(in packets) times the connection's MSS. */ virtual absl::optional congestionWindowInBytes() const PURE; + + bool write_disable{false}; }; using ConnectionPtr = std::unique_ptr; diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 796fa59922f1..64f936cc4ea1 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -588,7 +588,9 @@ void ConnectionImpl::onFileEvent(uint32_t events) { } if (events & Event::FileReadyType::Write) { - onWriteReady(); + if (!write_disable) { + onWriteReady(); + } } // It's possible for a write event callback to close the socket (which will cause fd_ to be -1). diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 10d6fc94fd32..1b8d55e2a76b 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -153,7 +153,7 @@ EXTENSIONS = { "envoy.filters.network.sni_dynamic_forward_proxy": "//source/extensions/filters/network/sni_dynamic_forward_proxy:config", "envoy.filters.network.wasm": "//source/extensions/filters/network/wasm:config", "envoy.filters.network.zookeeper_proxy": "//source/extensions/filters/network/zookeeper_proxy:config", - + "envoy.filters.network.bumping": "//source/extensions/filters/network/bumping:config", # # UDP filters # diff --git a/source/extensions/extensions_metadata.yaml b/source/extensions/extensions_metadata.yaml index f2eb669d41d8..569b93b5411c 100644 --- a/source/extensions/extensions_metadata.yaml +++ b/source/extensions/extensions_metadata.yaml @@ -612,6 +612,13 @@ envoy.filters.network.zookeeper_proxy: status: alpha type_urls: - envoy.extensions.filters.network.zookeeper_proxy.v3.ZooKeeperProxy +envoy.filters.network.bumping: + categories: + - envoy.filters.network + security_posture: unknown + status: alpha + type_urls: + - envoy.extensions.filters.network.bumping.v3.Bumping envoy.filters.thrift.header_to_metadata: categories: - envoy.thrift_proxy.filters diff --git a/source/extensions/filters/network/bumping/BUILD b/source/extensions/filters/network/bumping/BUILD new file mode 100644 index 000000000000..d83f04f8fc41 --- /dev/null +++ b/source/extensions/filters/network/bumping/BUILD @@ -0,0 +1,76 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_extension_package() + +envoy_cc_library( + name = "bumping_lib", + srcs = [ + "bumping.cc", + ], + hdrs = [ + "bumping.h", + ], + external_deps = [ + "ssl", + ], + deps = [ + "//envoy/access_log:access_log_interface", + "//envoy/buffer:buffer_interface", + "//envoy/common:time_interface", + "//envoy/event:dispatcher_interface", + "//envoy/network:connection_interface", + "//envoy/network:filter_interface", + "//envoy/router:router_interface", + "//envoy/server:filter_config_interface", + "//envoy/stats:stats_interface", + "//envoy/stats:stats_macros", + "//envoy/stats:timespan_interface", + "//envoy/stream_info:filter_state_interface", + "//envoy/tcp:conn_pool_interface", + "//envoy/tcp:upstream_interface", + "//envoy/upstream:cluster_manager_interface", + "//envoy/upstream:upstream_interface", + "//source/common/access_log:access_log_lib", + "//source/common/common:assert_lib", + "//source/common/common:empty_string", + "//source/common/common:macros", + "//source/common/common:minimal_logger_lib", + "//source/common/http:codec_client_lib", + "//source/common/network:application_protocol_lib", + "//source/common/network:cidr_range_lib", + "//source/common/network:filter_lib", + "//source/common/network:proxy_protocol_filter_state_lib", + "//source/common/network:socket_option_factory_lib", + "//source/common/network:transport_socket_options_lib", + "//source/common/network:upstream_server_name_lib", + "//source/common/network:upstream_socket_options_filter_state_lib", + "//source/common/network:utility_lib", + "//source/common/stream_info:stream_info_lib", + "//source/common/upstream:load_balancer_lib", + "//source/extensions/filters/network:well_known_names", + "//source/extensions/upstreams/tcp/generic:config", + "@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/filters/network/bumping/v3:pkg_cc_proto", + ], +) + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":bumping_lib", + "//envoy/registry", + "//source/common/tcp_proxy", + "//source/extensions/filters/network:well_known_names", + "//source/extensions/filters/network/common:factory_base_lib", + "@envoy_api//envoy/extensions/filters/network/bumping/v3:pkg_cc_proto", + ], +) \ No newline at end of file diff --git a/source/extensions/filters/network/bumping/bumping.cc b/source/extensions/filters/network/bumping/bumping.cc new file mode 100644 index 000000000000..611f86fa77f4 --- /dev/null +++ b/source/extensions/filters/network/bumping/bumping.cc @@ -0,0 +1,293 @@ +#include "source/extensions/filters/network/bumping/bumping.h" + +#include +#include +#include + +#include "envoy/buffer/buffer.h" +#include "envoy/config/accesslog/v3/accesslog.pb.h" +#include "envoy/event/dispatcher.h" +#include "envoy/event/timer.h" +#include "envoy/extensions/filters/network/bumping/v3/bumping.pb.h" +#include "envoy/stats/scope.h" +#include "envoy/upstream/cluster_manager.h" +#include "envoy/upstream/upstream.h" + +#include "source/common/access_log/access_log_impl.h" +#include "source/common/common/assert.h" +#include "source/common/common/empty_string.h" +#include "source/common/common/enum_to_int.h" +#include "source/common/common/fmt.h" +#include "source/common/common/macros.h" +#include "source/common/common/utility.h" +#include "source/common/config/utility.h" +#include "source/common/config/well_known_names.h" +#include "source/common/network/application_protocol.h" +#include "source/common/network/proxy_protocol_filter_state.h" +#include "source/common/network/socket_option_factory.h" +#include "source/common/network/transport_socket_options_impl.h" +#include "source/common/network/upstream_server_name.h" +#include "source/common/network/upstream_socket_options_filter_state.h" + +namespace Envoy { +namespace Bumping { + +Config::SimpleRouteImpl::SimpleRouteImpl(const Config& parent, absl::string_view cluster_name) + : parent_(parent), cluster_name_(cluster_name) {} + +Config::Config(const envoy::extensions::filters::network::bumping::v3::Bumping& config, + Server::Configuration::FactoryContext& context) + : stats_scope_(context.scope().createScope(fmt::format("bumping.{}", config.stat_prefix()))), + stats_(generateStats(*stats_scope_)), + max_connect_attempts_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_connect_attempts, 1)){ + if (!config.cluster().empty()) { + default_route_ = std::make_shared(*this, config.cluster()); + } + + for (const envoy::config::accesslog::v3::AccessLog& log_config : config.access_log()) { + access_logs_.emplace_back(AccessLog::AccessLogFactory::fromProto(log_config, context)); + } +} + +BumpingStats Config::generateStats(Stats::Scope& scope) { + return {ALL_BUMPING_STATS(POOL_COUNTER(scope))}; +} + +RouteConstSharedPtr Config::getRoute() { + if (default_route_ != nullptr) { + return default_route_; + } + + // no match, no more routes to try + return nullptr; +} + +Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager) + : config_(config), cluster_manager_(cluster_manager), + upstream_callbacks_(new UpstreamCallbacks(this)) { + ASSERT(config != nullptr); +} + +Filter::~Filter() { + for (const auto& access_log : config_->accessLogs()) { + access_log->log(nullptr, nullptr, nullptr, getStreamInfo()); + } +} + +void Filter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { + read_callbacks_ = &callbacks; + ENVOY_CONN_LOG(debug, "new bumping session", read_callbacks_->connection()); + + // Need to disable reads and writes to postpone downstream handshakes. + // This will get re-enabled when transport socket is refreshed with mimic cert. + //TODO, needs refactoring + read_callbacks_->connection().readDisable(true); + read_callbacks_->connection().write_disable = true; +} + +StreamInfo::StreamInfo& Filter::getStreamInfo() { + return read_callbacks_->connection().streamInfo(); +} + +void Filter::UpstreamCallbacks::onEvent(Network::ConnectionEvent event) { + if (event == Network::ConnectionEvent::Connected || + event == Network::ConnectionEvent::ConnectedZeroRtt) { + return; + } + parent_->onUpstreamEvent(event); +} + +Network::FilterStatus Filter::establishUpstreamConnection() { + const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING; + Upstream::ThreadLocalCluster* thread_local_cluster = + cluster_manager_.getThreadLocalCluster(cluster_name); + + if (!thread_local_cluster) { + ENVOY_CONN_LOG(debug, "Cluster not found {}.", + read_callbacks_->connection(), cluster_name); + config_->stats().downstream_cx_no_route_.inc(); + getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound); + onInitFailure(UpstreamFailureReason::NoRoute); + return Network::FilterStatus::StopIteration; + } + + ENVOY_CONN_LOG(debug, "Creating connection to cluster {}", read_callbacks_->connection(), + cluster_name); + + const Upstream::ClusterInfoConstSharedPtr& cluster = thread_local_cluster->info(); + + // Check this here because the TCP conn pool will queue our request waiting for a connection that + // will never be released. + if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) { + getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); + cluster->stats().upstream_cx_overflow_.inc(); + onInitFailure(UpstreamFailureReason::ResourceLimitExceeded); + return Network::FilterStatus::StopIteration; + } + + const uint32_t max_connect_attempts = config_->maxConnectAttempts(); + if (connect_attempts_ >= max_connect_attempts) { + getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded); + cluster->stats().upstream_cx_connect_attempts_exceeded_.inc(); + onInitFailure(UpstreamFailureReason::ConnectFailed); + return Network::FilterStatus::StopIteration; + } + + auto& downstream_connection = read_callbacks_->connection(); + auto& filter_state = downstream_connection.streamInfo().filterState(); + transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(*filter_state); + + if (auto typed_state = filter_state->getDataReadOnly( + Network::UpstreamSocketOptionsFilterState::key()); + typed_state != nullptr) { + auto downstream_options = typed_state->value(); + if (!upstream_options_) { + upstream_options_ = std::make_shared(); + } + Network::Socket::appendOptions(upstream_options_, downstream_options); + } + + if (!maybeTunnel(*thread_local_cluster)) { + // Either cluster is unknown or there are no healthy hosts. tcpConnPool() increments + // cluster->stats().upstream_cx_none_healthy in the latter case. + getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream); + onInitFailure(UpstreamFailureReason::NoHealthyUpstream); + } + return Network::FilterStatus::StopIteration; +} + +bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster) { + TcpProxy::GenericConnPoolFactory* factory = nullptr; + if (cluster.info()->upstreamConfig().has_value()) { + factory = Envoy::Config::Utility::getFactory( + cluster.info()->upstreamConfig().value()); + } else { + factory = Envoy::Config::Utility::getFactoryByName( + "envoy.filters.connection_pools.tcp.generic"); + } + if (!factory) { + return false; + } + + generic_conn_pool_ = factory->createGenericConnPool(cluster, TcpProxy::TunnelingConfigHelperOptConstRef(), + this, *upstream_callbacks_); + if (generic_conn_pool_) { + connecting_ = true; + connect_attempts_++; + getStreamInfo().setAttemptCount(connect_attempts_); + generic_conn_pool_->newStream(*this); + // Because we never return open connections to the pool, this either has a handle waiting on + // connection completion, or onPoolFailure has been invoked. Either way, stop iteration. + return true; + } + return false; +} + +void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, + absl::string_view failure_reason, + Upstream::HostDescriptionConstSharedPtr host) { + generic_conn_pool_.reset(); + read_callbacks_->upstreamHost(host); + getStreamInfo().upstreamInfo()->setUpstreamHost(host); + getStreamInfo().upstreamInfo()->setUpstreamTransportFailureReason(failure_reason); + + switch (reason) { + case ConnectionPool::PoolFailureReason::Overflow: + case ConnectionPool::PoolFailureReason::LocalConnectionFailure: + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); + break; + case ConnectionPool::PoolFailureReason::RemoteConnectionFailure: + upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); + break; + case ConnectionPool::PoolFailureReason::Timeout: + onConnectTimeout(); + break; + } +} + +void Filter::onGenericPoolReady(StreamInfo::StreamInfo*, + std::unique_ptr&& upstream, + Upstream::HostDescriptionConstSharedPtr&, + const Network::Address::InstanceConstSharedPtr&, + Ssl::ConnectionInfoConstSharedPtr) { + + // Ssl::ConnectionInfoConstSharedPtr should contains enough info for cert mimicking + //TODO Cert mimick + upstream_ = std::move(upstream); + generic_conn_pool_.reset(); + onUpstreamConnection(); + read_callbacks_->continueReading(); +} + +void Filter::onConnectTimeout() { + ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection()); + getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamConnectionFailure); + + // Raise LocalClose, which will trigger a reconnect if needed/configured. + upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose); +} + +Network::FilterStatus Filter::onData(Buffer::Instance&, bool) { + return Network::FilterStatus::Continue; +} + +Network::FilterStatus Filter::onNewConnection() { + ASSERT(upstream_ == nullptr); + route_ = pickRoute(); + return establishUpstreamConnection(); +} + +void Filter::onUpstreamEvent(Network::ConnectionEvent event) { + if (event == Network::ConnectionEvent::ConnectedZeroRtt) { + return; + } + // Update the connecting flag before processing the event because we may start a new connection + // attempt in establishUpstreamConnection. + bool connecting = connecting_; + connecting_ = false; + + if (event == Network::ConnectionEvent::RemoteClose || + event == Network::ConnectionEvent::LocalClose) { + upstream_.reset(); + disableIdleTimer(); + + // happens in on onPoolFailure + if (connecting) { + if (event == Network::ConnectionEvent::RemoteClose) { + getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamConnectionFailure); + } + establishUpstreamConnection(); + } + } +} + +void Filter::onUpstreamConnection() { + connecting_ = false; + + //TODO, interact with Cert Provider + + // Re-enable downstream reads and writes now that the upstream connection is established + // so we have a place to send downstream data to. + read_callbacks_->connection().readDisable(false); + read_callbacks_->connection().write_disable = false; + + ENVOY_CONN_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}", + read_callbacks_->connection(), + getStreamInfo().downstreamAddressProvider().requestedServerName()); +} + +void Filter::resetIdleTimer() { + if (idle_timer_ != nullptr) { + ASSERT(config_->idleTimeout()); + idle_timer_->enableTimer(config_->idleTimeout().value()); + } +} + +void Filter::disableIdleTimer() { + if (idle_timer_ != nullptr) { + idle_timer_->disableTimer(); + idle_timer_.reset(); + } +} +} // namespace Bumping +} // namespace Envoy \ No newline at end of file diff --git a/source/extensions/filters/network/bumping/bumping.h b/source/extensions/filters/network/bumping/bumping.h new file mode 100644 index 000000000000..d6dcc253fafa --- /dev/null +++ b/source/extensions/filters/network/bumping/bumping.h @@ -0,0 +1,224 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/access_log/access_log.h" +#include "envoy/common/random_generator.h" +#include "envoy/event/timer.h" +#include "envoy/extensions/filters/network/bumping/v3/bumping.pb.h" +#include "envoy/http/header_evaluator.h" +#include "envoy/network/connection.h" +#include "envoy/network/filter.h" +#include "envoy/runtime/runtime.h" +#include "envoy/server/filter_config.h" +#include "envoy/stats/scope.h" +#include "envoy/stats/stats_macros.h" +#include "envoy/stats/timespan.h" +#include "envoy/stream_info/filter_state.h" +#include "envoy/tcp/upstream.h" +#include "envoy/upstream/cluster_manager.h" +#include "envoy/upstream/upstream.h" + +#include "source/common/common/logger.h" +#include "source/common/network/cidr_range.h" +#include "source/common/network/filter_impl.h" +#include "source/common/network/utility.h" +#include "source/common/stream_info/stream_info_impl.h" +#include "source/common/upstream/load_balancer_impl.h" + +namespace Envoy { +namespace Bumping { + +/** + * All bumping stats. @see stats_macros.h + */ +#define ALL_BUMPING_STATS(COUNTER) \ + COUNTER(downstream_cx_no_route) +/** + * Struct definition for all bumping stats. @see stats_macros.h + */ +struct BumpingStats { + ALL_BUMPING_STATS(GENERATE_COUNTER_STRUCT); +}; + +/** + * Route is an individual resolved route for a connection. + */ +class Route { +public: + virtual ~Route() = default; + + /** + * Check whether this route matches a given connection. + * @param connection supplies the connection to test against. + * @return bool true if this route matches a given connection. + */ + virtual bool matches(Network::Connection& connection) const PURE; + + /** + * @return const std::string& the upstream cluster that owns the route. + */ + virtual const std::string& clusterName() const PURE; +}; + +using RouteConstSharedPtr = std::shared_ptr; + +/** + * Filter configuration. + * + * This configuration holds a TLS slot, and therefore it must be destructed + * on the main thread. + */ +class Config { +public: + Config(const envoy::extensions::filters::network::bumping::v3::Bumping& config, + Server::Configuration::FactoryContext& context); + + /** + * Find out which cluster an upstream connection should be opened to based on the + * parameters of a downstream connection. + * @param connection supplies the parameters of the downstream connection for + * which the proxy needs to open the corresponding upstream. + * @return the route to be used for the upstream connection. + * If no route applies, returns nullptr. + */ + RouteConstSharedPtr getRoute(); + const BumpingStats& stats() { return stats_; } + const std::vector& accessLogs() { return access_logs_; } + uint32_t maxConnectAttempts() const { return max_connect_attempts_; } + const absl::optional& idleTimeout() { + return idle_timeout_; + } +private: + struct SimpleRouteImpl : public Route { + SimpleRouteImpl(const Config& parent, absl::string_view cluster_name); + + // Route + bool matches(Network::Connection&) const override { return true; } + const std::string& clusterName() const override { return cluster_name_; } + + const Config& parent_; + std::string cluster_name_; + }; + RouteConstSharedPtr default_route_; + + static BumpingStats generateStats(Stats::Scope& scope); + + std::vector access_logs_; + absl::optional idle_timeout_; + const Stats::ScopeSharedPtr stats_scope_; + const BumpingStats stats_; + const uint32_t max_connect_attempts_; + absl::optional max_downstream_connection_duration_; +}; + +using ConfigSharedPtr = std::shared_ptr; + +/** + * An implementation of a Bumping filter. This filter will instantiate a new outgoing TCP + * connection using TCP connection pool for the configured cluster. The established connection + * is only used for getting upstream cert, no data exchange happens. + */ +class Filter : public Network::ReadFilter, + public Upstream::LoadBalancerContextBase, + protected Logger::Loggable, + public TcpProxy::GenericConnectionPoolCallbacks { +public: + Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager); + ~Filter() override; + + // Network::ReadFilter + Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; + Network::FilterStatus onNewConnection() override; + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; + + // GenericConnectionPoolCallbacks + void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr&& upstream, + Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info) override; + void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, + absl::string_view failure_reason, + Upstream::HostDescriptionConstSharedPtr host) override; + + // Upstream::LoadBalancerContext + const Router::MetadataMatchCriteria* metadataMatchCriteria() override { return nullptr; } + absl::optional computeHashKey() override { + return {}; + } + const Network::Connection* downstreamConnection() const override { + return &read_callbacks_->connection(); + } + + struct UpstreamCallbacks : public Tcp::ConnectionPool::UpstreamCallbacks { + UpstreamCallbacks(Filter* parent) : parent_(parent) {} + + // Tcp::ConnectionPool::UpstreamCallbacks + void onUpstreamData(Buffer::Instance&, bool) override {}; + void onEvent(Network::ConnectionEvent event) override; + void onAboveWriteBufferHighWatermark() override {}; + void onBelowWriteBufferLowWatermark() override {}; + + Filter* parent_{}; + }; + + StreamInfo::StreamInfo& getStreamInfo(); + +protected: + enum class UpstreamFailureReason { + ConnectFailed, + NoHealthyUpstream, + ResourceLimitExceeded, + NoRoute, + }; + + virtual RouteConstSharedPtr pickRoute() { + return config_->getRoute(); + } + + virtual void onInitFailure(UpstreamFailureReason) { + read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush); + } + + // Create connection to the upstream cluster. This function can be repeatedly called on upstream + // connection failure. + Network::FilterStatus establishUpstreamConnection(); + + // The callback upon on demand cluster discovery response. + //void onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status); + + bool maybeTunnel(Upstream::ThreadLocalCluster& cluster); + void onConnectTimeout(); + void onUpstreamEvent(Network::ConnectionEvent event); + void onUpstreamConnection(); + void onIdleTimeout(); + void resetIdleTimer(); + void disableIdleTimer(); + + const ConfigSharedPtr config_; + Upstream::ClusterManager& cluster_manager_; + Network::ReadFilterCallbacks* read_callbacks_{}; + + Event::TimerPtr idle_timer_; + Event::TimerPtr connection_duration_timer_; + + std::shared_ptr upstream_callbacks_; // shared_ptr required for passing as a + // read filter. + // The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist + // until either the upstream or downstream connection is terminated. + std::unique_ptr upstream_; + // The connection pool used to set up |upstream_|. + // This will be non-null from when an upstream connection is attempted until + // it either succeeds or fails. + std::unique_ptr generic_conn_pool_; + RouteConstSharedPtr route_; + Network::TransportSocketOptionsConstSharedPtr transport_socket_options_; + Network::Socket::OptionsSharedPtr upstream_options_; + uint32_t connect_attempts_{}; + bool connecting_{}; +}; +} // namespace Bumping +} // namespace Envoy diff --git a/source/extensions/filters/network/bumping/config.cc b/source/extensions/filters/network/bumping/config.cc new file mode 100644 index 000000000000..33757d5b1207 --- /dev/null +++ b/source/extensions/filters/network/bumping/config.cc @@ -0,0 +1,36 @@ +#include "source/extensions/filters/network/bumping/config.h" + +#include "envoy/extensions/filters/network/bumping/v3/bumping.pb.h" +#include "envoy/extensions/filters/network/bumping/v3/bumping.pb.validate.h" +#include "envoy/registry/registry.h" + +#include "source/extensions/filters/network/bumping/bumping.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Bumping { + +Network::FilterFactoryCb ConfigFactory::createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::network::bumping::v3::Bumping& proto_config, + Server::Configuration::FactoryContext& context) { + ASSERT(!proto_config.stat_prefix().empty()); + + Envoy::Bumping::ConfigSharedPtr filter_config( + std::make_shared(proto_config, context)); + return [filter_config, &context](Network::FilterManager& filter_manager) -> void { + filter_manager.addReadFilter( + std::make_shared(filter_config, context.clusterManager())); + }; +} + +/** + * Static registration for the bumping filter. @see RegisterFactory. + */ +REGISTER_FACTORY(ConfigFactory, + Server::Configuration::NamedNetworkFilterConfigFactory){"envoy.bumping"}; + +} // namespace Bumping +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/bumping/config.h b/source/extensions/filters/network/bumping/config.h new file mode 100644 index 000000000000..00b6e62676ef --- /dev/null +++ b/source/extensions/filters/network/bumping/config.h @@ -0,0 +1,31 @@ +#pragma once + +#include "envoy/extensions/filters/network/bumping/v3/bumping.pb.h" +#include "envoy/extensions/filters/network/bumping/v3/bumping.pb.validate.h" + +#include "source/extensions/filters/network/common/factory_base.h" +#include "source/extensions/filters/network/well_known_names.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Bumping { + +/** + * Config registration for the bumping filter. @see NamedNetworkFilterConfigFactory. + */ + +class ConfigFactory + : public Common::FactoryBase { +public: + ConfigFactory() : FactoryBase(NetworkFilterNames::get().Bumping, false) {} +private: + Network::FilterFactoryCb createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::network::bumping::v3::Bumping& proto_config, + Server::Configuration::FactoryContext& context) override; +}; + +} // namespace Bumping +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/well_known_names.h b/source/extensions/filters/network/well_known_names.h index 0017db6f1ff7..6c2924414aa4 100644 --- a/source/extensions/filters/network/well_known_names.h +++ b/source/extensions/filters/network/well_known_names.h @@ -59,6 +59,8 @@ class NetworkFilterNameValues { const std::string ZooKeeperProxy = "envoy.filters.network.zookeeper_proxy"; // WebAssembly filter const std::string Wasm = "envoy.filters.network.wasm"; + // Bumping filter + const std::string Bumping = "envoy.filters.network.bumping"; }; using NetworkFilterNames = ConstSingleton;