Skip to content

Commit

Permalink
DFP: Track connections in the DFP LoadBalancer (#17874)
Browse files Browse the repository at this point in the history
Widen the Upstream::LoadBalancer interface to add methods which allow
a load balancer to optionally observe connection lifetime events and to
select a specific connection based on those events.

Implement this behavior in the DFP LoadBalancer to reuse HTTP/2 and HTTP/3
connections when the hash key, IP address, and certificate all match, conditional
on a new allow_coalesced_connections field in the DFP config.

Risk Level: Low
Testing: Unit

Signed-off-by: Ryan Hamilton <[email protected]>
  • Loading branch information
RyanTheOptimist authored Oct 21, 2021
1 parent 04ad439 commit b404746
Show file tree
Hide file tree
Showing 21 changed files with 671 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,26 @@ message ClusterConfig {
// in the :ref:`cluster's upstream_http_protocol_options
// <envoy_v3_api_field_config.cluster.v3.Cluster.upstream_http_protocol_options>`
bool allow_insecure_cluster_options = 2;

// [#not-implemented-hide:]
// If true allow HTTP/2 and HTTP/3 connections to be reused for requests to different
// origins than the connection was initially created for. This will only happen when the
// resolved address for the new connection matches the peer address of the connection and
// the TLS certificate is also valid for the new hostname. For example, if a connection
// has previously been established to foo.example.com at IP 1.2.3.4 with a certificate
// that is valid for `*.example.com`, then this connection could be used for requests to
// bar.example.com if that also resolved to 1.2.3.4.
//
// .. note::
// By design, this feature will maximize reuse of connections. This means that instead
// opening a new connection when an existing connection reaches the maximum number of
// concurrent streams, requests will instead be sent to the existing connection.
// TODO(alyssawilk) implement request queueing in connections.
//
// .. note::
// The coalesced connections might be to upstreams that would not be otherwise
// selected by Envoy. See the section `Connection Reuse in RFC 7540
// <https://datatracker.ietf.org/doc/html/rfc7540#section-9.1.1>`_
//
bool allow_coalesced_connections = 3;
}
28 changes: 28 additions & 0 deletions envoy/http/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,34 @@ class Callbacks {
absl::optional<Http::Protocol> protocol) PURE;
};

class Instance;

/**
* Pool callbacks invoked to track the lifetime of connections in the pool.
*/
class ConnectionLifetimeCallbacks {
public:
virtual ~ConnectionLifetimeCallbacks() = default;

/**
* Called when a connection is open for requests in a pool.
* @param pool which the connection is associated with.
* @param hash_key the hash key used for this connection.
* @param connection newly open connection.
*/
virtual void onConnectionOpen(Instance& pool, std::vector<uint8_t>& hash_key,
const Network::Connection& connection) PURE;

/**
* Called when a connection is draining and may no longer be used for requests.
* @param pool which the connection is associated with.
* @param hash_key the hash key used for this connection.
* @param connection newly open connection.
*/
virtual void onConnectionDraining(Instance& pool, std::vector<uint8_t>& hash_key,
const Network::Connection& connection) PURE;
};

/**
* An instance of a generic connection pool.
*/
Expand Down
30 changes: 30 additions & 0 deletions envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
#include "envoy/upstream/upstream.h"

namespace Envoy {
namespace Http {
namespace ConnectionPool {
class ConnectionLifetimeCallbacks;
} // namespace ConnectionPool
} // namespace Http
namespace Upstream {

/**
Expand Down Expand Up @@ -104,6 +109,14 @@ class LoadBalancerContext {
virtual absl::optional<OverrideHost> overrideHostToSelect() const PURE;
};

/**
* Identifies a specific connection within a pool.
*/
struct SelectedPoolAndConnection {
Envoy::ConnectionPool::Instance& pool_;
const Network::Connection& connection_;
};

/**
* Abstract load balancing interface.
*/
Expand All @@ -126,6 +139,23 @@ class LoadBalancer {
* @param context supplies the context which is used in host selection.
*/
virtual HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) PURE;

/**
* Returns connnection lifetime callbacks that may be used to inform the load balancer of
* connection events. Load balancers which do not intend to track connection lifetime events
* will return nullopt.
* @return optional lifetime callbacks for this load balancer.
*/
virtual OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() PURE;

/**
* Returns a specific pool and existing connection to be used for the specified host.
*
* @return selected pool and connection to be used, or nullopt if no selection is made,
* for example if no matching connection is found.
*/
virtual absl::optional<SelectedPoolAndConnection>
selectPool(LoadBalancerContext* context, const Host& host, std::vector<uint8_t>& hash_key) PURE;
};

using LoadBalancerPtr = std::unique_ptr<LoadBalancer>;
Expand Down
11 changes: 11 additions & 0 deletions source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ class LoadBalancerBase : public LoadBalancer {
choosePriority(uint64_t hash, const HealthyLoad& healthy_per_priority_load,
const DegradedLoad& degraded_per_priority_load);

// Pool selection not implemented.
absl::optional<Upstream::SelectedPoolAndConnection>
selectPool(Upstream::LoadBalancerContext* /*context*/, const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override {
return absl::nullopt;
}
// Lifetime tracking not implemented.
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
return {};
}

protected:
/**
* For the given host_set @return if we should be in a panic mode or not. For example, if the
Expand Down
10 changes: 10 additions & 0 deletions source/common/upstream/original_dst_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ class OriginalDstCluster : public ClusterImplBase {
HostConstSharedPtr chooseHost(LoadBalancerContext* context) override;
// Preconnecting is not implemented for OriginalDstCluster
HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
// Pool selection not implemented for OriginalDstCluster
absl::optional<Upstream::SelectedPoolAndConnection>
selectPool(Upstream::LoadBalancerContext* /*context*/, const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override {
return absl::nullopt;
}
// Lifetime tracking not implemented for OriginalDstCluster
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
return {};
}

private:
Network::Address::InstanceConstSharedPtr requestOverrideHost(LoadBalancerContext* context);
Expand Down
10 changes: 10 additions & 0 deletions source/common/upstream/subset_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::Loggable<Logger::Id::ups
HostConstSharedPtr chooseHost(LoadBalancerContext* context) override;
// TODO(alyssawilk) implement for non-metadata match.
HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
// Pool selection not implemented.
absl::optional<Upstream::SelectedPoolAndConnection>
selectPool(Upstream::LoadBalancerContext* /*context*/, const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override {
return absl::nullopt;
}
// Lifetime tracking not implemented.
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
return {};
}

private:
using HostPredicate = std::function<bool(const Host&)>;
Expand Down
18 changes: 18 additions & 0 deletions source/common/upstream/thread_aware_lb_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL
HostConstSharedPtr chooseHost(LoadBalancerContext*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
// Preconnect not implemented for hash based load balancing
HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
// Pool selection not implemented.
absl::optional<Upstream::SelectedPoolAndConnection>
selectPool(Upstream::LoadBalancerContext* /*context*/, const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override {
return absl::nullopt;
}
// Lifetime tracking not implemented.
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
return {};
}

protected:
ThreadAwareLoadBalancerBase(
Expand All @@ -115,6 +125,14 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL
HostConstSharedPtr chooseHost(LoadBalancerContext* context) override;
// Preconnect not implemented for hash based load balancing
HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
absl::optional<Upstream::SelectedPoolAndConnection>
selectPool(Upstream::LoadBalancerContext* /*context*/, const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override {
return absl::nullopt;
}
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
return {};
}

ClusterStats& stats_;
Random::RandomGenerator& random_;
Expand Down
26 changes: 26 additions & 0 deletions source/extensions/clusters/aggregate/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,32 @@ AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context)
return nullptr;
}

Upstream::HostConstSharedPtr
AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) {
if (load_balancer_) {
return load_balancer_->peekAnotherHost(context);
}
return nullptr;
}

absl::optional<Upstream::SelectedPoolAndConnection>
AggregateClusterLoadBalancer::selectPool(Upstream::LoadBalancerContext* context,
const Upstream::Host& host,
std::vector<uint8_t>& hash_key) {
if (load_balancer_) {
return load_balancer_->selectPool(context, host, hash_key);
}
return absl::nullopt;
}

OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
AggregateClusterLoadBalancer::lifetimeCallbacks() {
if (load_balancer_) {
return load_balancer_->lifetimeCallbacks();
}
return {};
}

std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>
ClusterFactory::createClusterWithConfig(
const envoy::config::cluster::v3::Cluster& cluster,
Expand Down
17 changes: 13 additions & 4 deletions source/extensions/clusters/aggregate/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer,

// Upstream::LoadBalancer
Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
// Preconnecting not yet implemented for extensions.
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
return nullptr;
}
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override;
absl::optional<Upstream::SelectedPoolAndConnection>
selectPool(Upstream::LoadBalancerContext* /*context*/, const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override;
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override;

private:
// Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the
Expand All @@ -101,6 +102,14 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer,
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
return nullptr;
}
absl::optional<Upstream::SelectedPoolAndConnection>
selectPool(Upstream::LoadBalancerContext* /*context*/, const Upstream::Host& /*host*/,
std::vector<uint8_t>& /*hash_key*/) override {
return {};
}
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
return {};
}

absl::optional<uint32_t> hostToLinearizedPriority(const Upstream::HostDescription& host) const;

Expand Down
1 change: 1 addition & 0 deletions source/extensions/clusters/dynamic_forward_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ envoy_cc_extension(
"//source/extensions/common/dynamic_forward_proxy:dns_cache_interface",
"//source/extensions/common/dynamic_forward_proxy:dns_cache_manager_impl",
"//source/extensions/filters/network/common:utility_lib",
"//source/extensions/transport_sockets/tls/cert_validator:cert_validator_lib",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/clusters/dynamic_forward_proxy/v3:pkg_cc_proto",
Expand Down
72 changes: 71 additions & 1 deletion source/extensions/clusters/dynamic_forward_proxy/cluster.cc
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#include "source/extensions/clusters/dynamic_forward_proxy/cluster.h"

#include <algorithm>

#include "envoy/config/cluster/v3/cluster.pb.h"
#include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.h"
#include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.validate.h"

#include "source/common/http/utility.h"
#include "source/common/network/transport_socket_options_impl.h"
#include "source/extensions/common/dynamic_forward_proxy/dns_cache_manager_impl.h"
#include "source/extensions/filters/network/common/utility.h"
#include "source/extensions/transport_sockets/tls/cert_validator/default_validator.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -26,7 +30,8 @@ Cluster::Cluster(
factory_context.mainThreadDispatcher().timeSource()),
dns_cache_manager_(cache_manager_factory.get()),
dns_cache_(dns_cache_manager_->getCache(config.dns_cache_config())),
update_callbacks_handle_(dns_cache_->addUpdateCallbacks(*this)), local_info_(local_info) {}
update_callbacks_handle_(dns_cache_->addUpdateCallbacks(*this)), local_info_(local_info),
allow_coalesced_connections_(config.allow_coalesced_connections()) {}

void Cluster::startPreInit() {
// If we are attaching to a pre-populated cache we need to initialize our hosts.
Expand Down Expand Up @@ -171,6 +176,71 @@ Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
}
}

absl::optional<Upstream::SelectedPoolAndConnection>
Cluster::LoadBalancer::selectPool(Upstream::LoadBalancerContext* /*context*/,
const Upstream::Host& host, std::vector<uint8_t>& hash_key) {
const std::string& hostname = host.hostname();
if (hostname.empty()) {
return absl::nullopt;
}

LookupKey key = {hash_key, *host.address()};
auto it = connection_info_map_.find(key);
if (it == connection_info_map_.end()) {
return absl::nullopt;
}

for (auto& info : it->second) {
Envoy::Ssl::ConnectionInfoConstSharedPtr ssl = info.connection_->ssl();
ASSERT(ssl);
for (const std::string& san : ssl->dnsSansPeerCertificate()) {
if (Extensions::TransportSockets::Tls::DefaultCertValidator::dnsNameMatch(hostname, san)) {
return Upstream::SelectedPoolAndConnection{*info.pool_, *info.connection_};
}
}
}

return absl::nullopt;
}

OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
Cluster::LoadBalancer::lifetimeCallbacks() {
if (!cluster_.allowCoalescedConnections()) {
return {};
}
return makeOptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>(*this);
}

void Cluster::LoadBalancer::onConnectionOpen(Envoy::Http::ConnectionPool::Instance& pool,
std::vector<uint8_t>& hash_key,
const Network::Connection& connection) {
// Only coalesce connections that are over TLS.
if (!connection.ssl()) {
return;
}
const std::string alpn = connection.nextProtocol();
if (alpn != Http::Utility::AlpnNames::get().Http2 &&
alpn != Http::Utility::AlpnNames::get().Http3) {
// Only coalesce connections for HTTP/2 and HTTP/3.
return;
}
const LookupKey key = {hash_key, *connection.connectionInfoProvider().remoteAddress()};
ConnectionInfo info = {&pool, &connection};
connection_info_map_[key].push_back(info);
}

void Cluster::LoadBalancer::onConnectionDraining(Envoy::Http::ConnectionPool::Instance& pool,
std::vector<uint8_t>& hash_key,
const Network::Connection& connection) {
const LookupKey key = {hash_key, *connection.connectionInfoProvider().remoteAddress()};
connection_info_map_[key].erase(
std::remove_if(connection_info_map_[key].begin(), connection_info_map_[key].end(),
[&pool, &connection](const ConnectionInfo& info) {
return (info.pool_ == &pool && info.connection_ == &connection);
}),
connection_info_map_[key].end());
}

std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>
ClusterFactory::createClusterWithConfig(
const envoy::config::cluster::v3::Cluster& cluster,
Expand Down
Loading

0 comments on commit b404746

Please sign in to comment.