Skip to content

Commit

Permalink
Revert connection pool cleanup (#17319)
Browse files Browse the repository at this point in the history
This reverts commit 3c266bb.
This reverts commit 3876d7c.

Signed-off-by: Greg Greenway <[email protected]>
  • Loading branch information
ggreenway authored Jul 13, 2021
1 parent 4b7389a commit b7bc539
Show file tree
Hide file tree
Showing 45 changed files with 400 additions and 1,079 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ conjunction with the
:ref:`Original Src Listener Filter <arch_overview_ip_transparency_original_src_listener>`. Finally,
Envoy supports generating this header using the :ref:`Proxy Protocol Transport Socket <extension_envoy.transport_sockets.upstream_proxy_protocol>`.

IMPORTANT: There is currently a memory `issue <https://github.com/envoyproxy/envoy/issues/16682>`_ in Envoy where upstream connection pools are
not cleaned up after they are created. This heavily affects the usage of this transport socket as new pools are created for every downstream client
IP and port pair. Removing a cluster will clean up its associated connection pools, which could be used to mitigate this issue in the current state.

Here is an example config for setting up the socket:

.. code-block:: yaml
Expand Down
1 change: 0 additions & 1 deletion docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ Bug Fixes
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* aws_lambda: if ``payload_passthrough`` is set to ``false``, the downstream response content-type header will now be set from the content-type entry in the JSON response's headers map, if present.
* cluster: delete pools when they're idle to fix unbounded memory use when using PROXY protocol upstream with tcp_proxy. This behavior can be temporarily reverted by setting the ``envoy.reloadable_features.conn_pool_delete_when_idle`` runtime guard to false.
* cluster: fixed the :ref:`cluster stats <config_cluster_manager_cluster_stats_request_response_sizes>` histograms by moving the accounting into the router
filter. This means that we now properly compute the number of bytes sent as well as handling retries which were previously ignored.
* hot_restart: fix double counting of ``server.seconds_until_first_ocsp_response_expiring`` and ``server.days_until_first_cert_expiring`` during hot-restart. This stat was only incorrect until the parent process terminated.
Expand Down
24 changes: 7 additions & 17 deletions envoy/common/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,17 @@ class Instance {
virtual ~Instance() = default;

/**
* Called when a connection pool has no pending streams, busy connections, or ready connections.
* Called when a connection pool has been drained of pending streams, busy connections, and
* ready connections.
*/
using IdleCb = std::function<void()>;
using DrainedCb = std::function<void()>;

/**
* Register a callback that gets called when the connection pool is fully idle.
* Register a callback that gets called when the connection pool is fully drained and kicks
* off a drain. The owner of the connection pool is responsible for not creating any
* new streams.
*/
virtual void addIdleCallback(IdleCb cb) PURE;

/**
* Returns true if the pool does not have any connections or pending requests.
*/
virtual bool isIdle() const PURE;

/**
* Starts draining a pool, by gracefully completing all requests and gracefully closing all
* connections, in preparation for deletion. When the process completes, the function registered
* via `addIdleCallback()` is called. The callback may occur before this call returns if the pool
* can be immediately drained.
*/
virtual void startDrain() PURE;
virtual void addDrainedCallback(DrainedCb cb) PURE;

/**
* Actively drain all existing connection pool connections. This method can be used in cases
Expand Down
6 changes: 0 additions & 6 deletions envoy/event/deferred_deletable.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ namespace Event {
class DeferredDeletable {
public:
virtual ~DeferredDeletable() = default;

/**
* Called when an object is passed to `deferredDelete`. This signals that the object will soon
* be deleted.
*/
virtual void deleteIsPending() {}
};

using DeferredDeletablePtr = std::unique_ptr<DeferredDeletable>;
Expand Down
4 changes: 3 additions & 1 deletion envoy/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class HttpPoolData {
/**
* See documentation of Envoy::ConnectionPool::Instance.
*/
void addIdleCallback(ConnectionPool::Instance::IdleCb cb) { pool_->addIdleCallback(cb); };
void addDrainedCallback(ConnectionPool::Instance::DrainedCb cb) {
pool_->addDrainedCallback(cb);
};

Upstream::HostDescriptionConstSharedPtr host() const { return pool_->host(); }

Expand Down
38 changes: 0 additions & 38 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,6 @@
namespace Envoy {
namespace Config {

namespace {
class AllMuxesState {
public:
void insert(GrpcMuxImpl* mux) {
absl::WriterMutexLock locker(&lock_);
muxes_.insert(mux);
}

void erase(GrpcMuxImpl* mux) {
absl::WriterMutexLock locker(&lock_);
muxes_.erase(mux);
}

void shutdownAll() {
absl::WriterMutexLock locker(&lock_);
for (auto& mux : muxes_) {
mux->shutdown();
}
}

private:
absl::flat_hash_set<GrpcMuxImpl*> muxes_ ABSL_GUARDED_BY(lock_);

// TODO(ggreenway): can this lock be removed? Is this code only run on the main thread?
absl::Mutex lock_;
};
using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
} // namespace

GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
Expand All @@ -59,13 +30,8 @@ GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
onDynamicContextUpdate(resource_type_url);
})) {
Config::Utility::checkLocalInfo("ads", local_info);
AllMuxes::get().insert(this);
}

GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); }

void GrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }

void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
auto api_state = api_state_.find(resource_type_url);
if (api_state == api_state_.end()) {
Expand All @@ -78,10 +44,6 @@ void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
void GrpcMuxImpl::start() { grpc_stream_.establishNewStream(); }

void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
if (shutdown_) {
return;
}

ApiState& api_state = apiStateFor(type_url);
auto& request = api_state.request_;
request.mutable_resource_names()->Clear();
Expand Down
15 changes: 0 additions & 15 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,6 @@ class GrpcMuxImpl : public GrpcMux,
Random::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node);

~GrpcMuxImpl() override;

// Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
// on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
// preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
// would then cause all `GrpcMuxImpl` to be destructed.
// TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
static void shutdownAll();

void shutdown() { shutdown_ = true; }

void start() override;

// GrpcMux
Expand Down Expand Up @@ -190,10 +179,6 @@ class GrpcMuxImpl : public GrpcMux,

Event::Dispatcher& dispatcher_;
Common::CallbackHandlePtr dynamic_update_callback_handle_;

// True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
// true because it may contain dangling pointers.
std::atomic<bool> shutdown_{false};
};

using GrpcMuxImplPtr = std::unique_ptr<GrpcMuxImpl>;
Expand Down
41 changes: 1 addition & 40 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,6 @@
namespace Envoy {
namespace Config {

namespace {
class AllMuxesState {
public:
void insert(NewGrpcMuxImpl* mux) {
absl::WriterMutexLock locker(&lock_);
muxes_.insert(mux);
}

void erase(NewGrpcMuxImpl* mux) {
absl::WriterMutexLock locker(&lock_);
muxes_.erase(mux);
}

void shutdownAll() {
absl::WriterMutexLock locker(&lock_);
for (auto& mux : muxes_) {
mux->shutdown();
}
}

private:
absl::flat_hash_set<NewGrpcMuxImpl*> muxes_ ABSL_GUARDED_BY(lock_);

// TODO(ggreenway): can this lock be removed? Is this code only run on the main thread?
absl::Mutex lock_;
};
using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
} // namespace

NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client,
Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
Expand All @@ -59,13 +30,7 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client,
[this](absl::string_view resource_type_url) {
onDynamicContextUpdate(resource_type_url);
})),
transport_api_version_(transport_api_version), dispatcher_(dispatcher) {
AllMuxes::get().insert(this);
}

NewGrpcMuxImpl::~NewGrpcMuxImpl() { AllMuxes::get().erase(this); }

void NewGrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
transport_api_version_(transport_api_version), dispatcher_(dispatcher) {}

void NewGrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
auto sub = subscriptions_.find(resource_type_url);
Expand Down Expand Up @@ -251,10 +216,6 @@ void NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use
}

void NewGrpcMuxImpl::trySendDiscoveryRequests() {
if (shutdown_) {
return;
}

while (true) {
// Do any of our subscriptions even want to send a request?
absl::optional<std::string> maybe_request_type = whoWantsToSendDiscoveryRequest();
Expand Down
15 changes: 0 additions & 15 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,6 @@ class NewGrpcMuxImpl
const RateLimitSettings& rate_limit_settings,
const LocalInfo::LocalInfo& local_info);

~NewGrpcMuxImpl() override;

// Causes all NewGrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
// on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
// preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
// would then cause all `NewGrpcMuxImpl` to be destructed.
// TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
static void shutdownAll();

void shutdown() { shutdown_ = true; }

GrpcMuxWatchPtr addWatch(const std::string& type_url,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
Expand Down Expand Up @@ -181,10 +170,6 @@ class NewGrpcMuxImpl
Common::CallbackHandlePtr dynamic_update_callback_handle_;
const envoy::config::core::v3::ApiVersion transport_api_version_;
Event::Dispatcher& dispatcher_;

// True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
// true because it may contain dangling pointers.
std::atomic<bool> shutdown_{false};
};

using NewGrpcMuxImplPtr = std::unique_ptr<NewGrpcMuxImpl>;
Expand Down
50 changes: 20 additions & 30 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@ ConnPoolImplBase::ConnPoolImplBase(
upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {}

ConnPoolImplBase::~ConnPoolImplBase() {
ASSERT(isIdleImpl());
ASSERT(connecting_stream_capacity_ == 0);
}

void ConnPoolImplBase::deleteIsPendingImpl() {
deferred_deleting_ = true;
ASSERT(isIdleImpl());
ASSERT(ready_clients_.empty());
ASSERT(busy_clients_.empty());
ASSERT(connecting_clients_.empty());
ASSERT(connecting_stream_capacity_ == 0);
}

Expand Down Expand Up @@ -233,8 +229,6 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
}

ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context) {
ASSERT(!deferred_deleting_);

ASSERT(static_cast<ssize_t>(connecting_stream_capacity_) ==
connectingCapacity(connecting_clients_)); // O(n) debug check.
if (!ready_clients_.empty()) {
Expand Down Expand Up @@ -282,7 +276,6 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context)
}

bool ConnPoolImplBase::maybePreconnect(float global_preconnect_ratio) {
ASSERT(!deferred_deleting_);
return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
}

Expand Down Expand Up @@ -333,11 +326,9 @@ void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client,
}
}

void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); }

void ConnPoolImplBase::startDrainImpl() {
is_draining_ = true;
checkForIdleAndCloseIdleConnsIfDraining();
void ConnPoolImplBase::addDrainedCallbackImpl(Instance::DrainedCb cb) {
drained_callbacks_.push_back(cb);
checkForDrained();
}

void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
Expand Down Expand Up @@ -379,19 +370,17 @@ void ConnPoolImplBase::drainConnectionsImpl() {
}
}

bool ConnPoolImplBase::isIdleImpl() const {
return pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() &&
connecting_clients_.empty();
}

void ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining() {
if (is_draining_) {
closeIdleConnectionsForDrainingPool();
void ConnPoolImplBase::checkForDrained() {
if (drained_callbacks_.empty()) {
return;
}

if (isIdleImpl()) {
ENVOY_LOG(debug, "invoking idle callbacks - is_draining_={}", is_draining_);
for (const Instance::IdleCb& cb : idle_callbacks_) {
closeIdleConnectionsForDrainingPool();

if (pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() &&
connecting_clients_.empty()) {
ENVOY_LOG(debug, "invoking drained callbacks");
for (const Instance::DrainedCb& cb : drained_callbacks_) {
cb();
}
}
Expand Down Expand Up @@ -454,8 +443,9 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
client.releaseResources();

dispatcher_.deferredDelete(client.removeFromList(owningList(client.state())));

checkForIdleAndCloseIdleConnsIfDraining();
if (incomplete_stream) {
checkForDrained();
}

client.setState(ActiveClient::State::CLOSED);

Expand All @@ -473,7 +463,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
// refer to client after this point.
onConnected(client);
onUpstreamReady();
checkForIdleAndCloseIdleConnsIfDraining();
checkForDrained();
}
}

Expand Down Expand Up @@ -543,7 +533,7 @@ void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
}

host_->cluster().stats().upstream_rq_cancelled_.inc();
checkForIdleAndCloseIdleConnsIfDraining();
checkForDrained();
}

namespace {
Expand Down
Loading

0 comments on commit b7bc539

Please sign in to comment.