Skip to content

Commit

Permalink
cluster: destroy on main thread (envoyproxy#14954)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Dai <[email protected]>
  • Loading branch information
lambdai authored Feb 23, 2021
1 parent a50b1c0 commit 114d5ae
Show file tree
Hide file tree
Showing 22 changed files with 396 additions and 13 deletions.
6 changes: 6 additions & 0 deletions include/envoy/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ envoy_cc_library(
hdrs = ["deferred_deletable.h"],
)

envoy_cc_library(
name = "dispatcher_thread_deletable",
hdrs = ["dispatcher_thread_deletable.h"],
)

envoy_cc_library(
name = "dispatcher_interface",
hdrs = ["dispatcher.h"],
deps = [
":deferred_deletable",
":dispatcher_thread_deletable",
":file_event_interface",
":scaled_timer",
":schedulable_cb_interface",
Expand Down
12 changes: 12 additions & 0 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "envoy/common/scope_tracker.h"
#include "envoy/common/time.h"
#include "envoy/event/dispatcher_thread_deletable.h"
#include "envoy/event/file_event.h"
#include "envoy/event/scaled_timer.h"
#include "envoy/event/schedulable_cb.h"
Expand Down Expand Up @@ -260,6 +261,12 @@ class Dispatcher : public DispatcherBase {
*/
virtual void post(PostCb callback) PURE;

/**
* Post the deletable to this dispatcher. The deletable objects are guaranteed to be destroyed on
* the dispatcher's thread before dispatcher destroy. This is safe cross thread.
*/
virtual void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) PURE;

/**
* Runs the event loop. This will not return until exit() is called either from within a callback
* or from a different thread.
Expand Down Expand Up @@ -287,6 +294,11 @@ class Dispatcher : public DispatcherBase {
* Updates approximate monotonic time to current value.
*/
virtual void updateApproximateMonotonicTime() PURE;

/**
* Shutdown the dispatcher by clear dispatcher thread deletable.
*/
virtual void shutdown() PURE;
};

using DispatcherPtr = std::unique_ptr<Dispatcher>;
Expand Down
21 changes: 21 additions & 0 deletions include/envoy/event/dispatcher_thread_deletable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include <memory>

namespace Envoy {
namespace Event {

/**
* If an object derives from this class, it can be passed to the destination dispatcher who
* guarantees to delete it in that dispatcher thread. The common use case is to ensure config
* related objects are deleted in the main thread.
*/
class DispatcherThreadDeletable {
public:
virtual ~DispatcherThreadDeletable() = default;
};

using DispatcherThreadDeletableConstPtr = std::unique_ptr<const DispatcherThreadDeletable>;

} // namespace Event
} // namespace Envoy
69 changes: 67 additions & 2 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
? watermark_factory
: std::make_shared<Buffer::WatermarkBufferFactory>()),
scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
thread_local_delete_cb_(
base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
[this]() -> void { clearDeferredDeleteList(); })),
post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
Expand All @@ -74,7 +76,12 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
}

DispatcherImpl::~DispatcherImpl() { FatalErrorHandler::removeFatalErrorHandler(*this); }
DispatcherImpl::~DispatcherImpl() {
ENVOY_LOG(debug, "destroying dispatcher {}", name_);
FatalErrorHandler::removeFatalErrorHandler(*this);
// TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable
// ASSERT(deletable_in_dispatcher_thread_.empty())
}

void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
std::chrono::milliseconds min_touch_interval) {
Expand Down Expand Up @@ -265,9 +272,23 @@ void DispatcherImpl::post(std::function<void()> callback) {
}
}

void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) {
bool need_schedule;
{
Thread::LockGuard lock(thread_local_deletable_lock_);
need_schedule = deletables_in_dispatcher_thread_.empty();
deletables_in_dispatcher_thread_.emplace_back(std::move(deletable));
// TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072
// ASSERT(!shutdown_called_, "inserted after shutdown");
}

if (need_schedule) {
thread_local_delete_cb_->scheduleCallbackCurrentIteration();
}
}

void DispatcherImpl::run(RunType type) {
run_tid_ = api_.threadFactory().currentThreadId();

// Flush all post callbacks before we run the event loop. We do this because there are post
// callbacks that have to get run before the initial event loop starts running. libevent does
// not guarantee that events are run in any particular order. So even if we post() and call
Expand All @@ -280,12 +301,56 @@ MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
return approximate_monotonic_time_;
}

void DispatcherImpl::shutdown() {
// TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below
// below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post
// callbacks and dispatcher thread deletable objects.
ASSERT(isThreadSafe());
auto deferred_deletables_size = current_to_delete_->size();
std::list<std::function<void()>>::size_type post_callbacks_size;
{
Thread::LockGuard lock(post_lock_);
post_callbacks_size = post_callbacks_.size();
}

std::list<DispatcherThreadDeletableConstPtr> local_deletables;
{
Thread::LockGuard lock(thread_local_deletable_lock_);
local_deletables = std::move(deletables_in_dispatcher_thread_);
}
auto thread_local_deletables_size = local_deletables.size();
while (!local_deletables.empty()) {
local_deletables.pop_front();
}
ASSERT(!shutdown_called_);
shutdown_called_ = true;
ENVOY_LOG(
trace,
"{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ",
__FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size);
}

void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); }

void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
approximate_monotonic_time_ = api_.timeSource().monotonicTime();
}

void DispatcherImpl::runThreadLocalDelete() {
std::list<DispatcherThreadDeletableConstPtr> to_be_delete;
{
Thread::LockGuard lock(thread_local_deletable_lock_);
to_be_delete = std::move(deletables_in_dispatcher_thread_);
ASSERT(deletables_in_dispatcher_thread_.empty());
}
while (!to_be_delete.empty()) {
// Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when
// executing complicated destruction.
touchWatchdog();
// Delete in FIFO order.
to_be_delete.pop_front();
}
}
void DispatcherImpl::runPostCallbacks() {
// Clear the deferred delete list before running post callbacks to reduce non-determinism in
// callback processing, and more easily detect if a scheduled post callback refers to one of the
Expand Down
19 changes: 17 additions & 2 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
void exit() override;
SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override;
void post(std::function<void()> callback) override;
void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override;
void run(RunType type) override;
Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; }
void pushTrackedObject(const ScopeTrackedObject* object) override;
void popTrackedObject(const ScopeTrackedObject* expected_object) override;
MonotonicTime approximateMonotonicTime() const override;
void updateApproximateMonotonicTime() override;
void shutdown() override;

// FatalErrorInterface
void onFatalError(std::ostream& os) const override;
Expand Down Expand Up @@ -127,6 +129,8 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
TimerPtr createTimerInternal(TimerCb cb);
void updateApproximateMonotonicTimeInternal();
void runPostCallbacks();
void runThreadLocalDelete();

// Helper used to touch the watchdog after most schedulable, fd, and timer callbacks.
void touchWatchdog();

Expand All @@ -145,13 +149,24 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
Buffer::WatermarkFactorySharedPtr buffer_factory_;
LibeventScheduler base_scheduler_;
SchedulerPtr scheduler_;

SchedulableCallbackPtr thread_local_delete_cb_;
Thread::MutexBasicLockable thread_local_deletable_lock_;
// `deletables_in_dispatcher_thread` must be destroyed last to allow other callbacks populate.
std::list<DispatcherThreadDeletableConstPtr>
deletables_in_dispatcher_thread_ ABSL_GUARDED_BY(thread_local_deletable_lock_);
bool shutdown_called_{false};

SchedulableCallbackPtr deferred_delete_cb_;

SchedulableCallbackPtr post_cb_;
Thread::MutexBasicLockable post_lock_;
std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_);

std::vector<DeferredDeletablePtr> to_delete_1_;
std::vector<DeferredDeletablePtr> to_delete_2_;
std::vector<DeferredDeletablePtr>* current_to_delete_;
Thread::MutexBasicLockable post_lock_;
std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_);

absl::InlinedVector<const ScopeTrackedObject*, ExpectedMaxTrackedObjectStackDepth>
tracked_object_stack_;
bool deferred_deleting_{};
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ void AsyncStreamImpl::cleanup() {
// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
if (LinkedObject<AsyncStreamImpl>::inserted()) {
ASSERT(dispatcher_->isThreadSafe());
dispatcher_->deferredDelete(
LinkedObject<AsyncStreamImpl>::removeFromList(parent_.active_streams_));
}
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) {
}

void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
ASSERT(dispatcher().isThreadSafe());
// Map send calls after local closure to no-ops. The send call could have been queued prior to
// remote reset or closure, and/or closure could have occurred synchronously in response to a
// previous send. In these cases the router will have already cleaned up stream state. This
Expand All @@ -169,6 +170,7 @@ void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
}

void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) {
ASSERT(dispatcher().isThreadSafe());
// See explanation in sendData.
if (local_closed_) {
return;
Expand Down Expand Up @@ -226,6 +228,7 @@ void AsyncStreamImpl::reset() {
}

void AsyncStreamImpl::cleanup() {
ASSERT(dispatcher().isThreadSafe());
local_closed_ = remote_closed_ = true;
// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
Expand Down
4 changes: 4 additions & 0 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ Connection::State ConnectionImpl::state() const {
void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }

void ConnectionImpl::setTransportSocketIsReadable() {
ASSERT(dispatcher_.isThreadSafe());
// Remember that the transport requested read resumption, in case the resumption event is not
// scheduled immediately or is "lost" because read was disabled.
transport_wants_read_ = true;
Expand Down Expand Up @@ -301,6 +302,7 @@ void ConnectionImpl::noDelay(bool enable) {
}

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
ASSERT(dispatcher_.isThreadSafe());
if (inDelayedClose() || !filterChainWantsData()) {
return;
}
Expand Down Expand Up @@ -420,6 +422,7 @@ void ConnectionImpl::raiseEvent(ConnectionEvent event) {
bool ConnectionImpl::readEnabled() const {
// Calls to readEnabled on a closed socket are considered to be an error.
ASSERT(state() == State::Open);
ASSERT(dispatcher_.isThreadSafe());
return read_disable_count_ == 0;
}

Expand All @@ -437,6 +440,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) {

void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
ASSERT(!end_stream || enable_half_close_);
ASSERT(dispatcher_.isThreadSafe());

if (write_end_stream_) {
// It is an API violation to write more data after writing end_stream, but a duplicate
Expand Down
19 changes: 13 additions & 6 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -926,9 +926,16 @@ ClusterImplBase::ClusterImplBase(

auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>(
cluster.transport_socket_matches(), factory_context, socket_factory, *stats_scope);
info_ = std::make_unique<ClusterInfoImpl>(cluster, factory_context.clusterManager().bindConfig(),
runtime, std::move(socket_matcher),
std::move(stats_scope), added_via_api, factory_context);
auto& dispatcher = factory_context.dispatcher();
info_ = std::shared_ptr<const ClusterInfoImpl>(
new ClusterInfoImpl(cluster, factory_context.clusterManager().bindConfig(), runtime,
std::move(socket_matcher), std::move(stats_scope), added_via_api,
factory_context),
[&dispatcher](const ClusterInfoImpl* self) {
ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name());
dispatcher.deleteInDispatcherThread(
std::unique_ptr<const Event::DispatcherThreadDeletable>(self));
});

if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN) &&
!raw_factory_pointer->supportsAlpn()) {
Expand Down Expand Up @@ -1120,7 +1127,7 @@ void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
for (size_t priority = 0; priority < host_sets.size(); ++priority) {
const auto& host_set = host_sets[priority];
// TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
HostVectorConstSharedPtr hosts_copy(new HostVector(host_set->hosts()));
HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts());

HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
prioritySet().updateHosts(priority,
Expand Down Expand Up @@ -1311,10 +1318,10 @@ void PriorityStateManager::registerHostForPriority(
auto metadata = lb_endpoint.has_metadata()
? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
: nullptr;
const HostSharedPtr host(new HostImpl(
const auto host = std::make_shared<HostImpl>(
parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(),
locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(),
locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source));
locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source);
registerHostForPriority(host, locality_lb_endpoint);
}

Expand Down
4 changes: 3 additions & 1 deletion source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,9 @@ class PrioritySetImpl : public PrioritySet {
/**
* Implementation of ClusterInfo that reads from JSON.
*/
class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable<Logger::Id::upstream> {
class ClusterInfoImpl : public ClusterInfo,
public Event::DispatcherThreadDeletable,
protected Logger::Loggable<Logger::Id::upstream> {
public:
using HttpProtocolOptionsConfigImpl =
Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl;
Expand Down
1 change: 1 addition & 0 deletions source/server/config_validation/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ void ValidationInstance::shutdown() {
config_.clusterManager()->shutdown();
}
thread_local_.shutdownThread();
dispatcher_->shutdown();
}

} // namespace Server
Expand Down
1 change: 1 addition & 0 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ InstanceImpl::~InstanceImpl() {
ENVOY_LOG(debug, "destroying listener manager");
listener_manager_.reset();
ENVOY_LOG(debug, "destroyed listener manager");
dispatcher_->shutdown();
}

Upstream::ClusterManager& InstanceImpl::clusterManager() {
Expand Down
1 change: 1 addition & 0 deletions source/server/worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ void WorkerImpl::threadRoutine(GuardDog& guard_dog) {
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(debug, "worker exited dispatch loop");
guard_dog.stopWatching(watch_dog_);
dispatcher_->shutdown();

// We must close all active connections before we actually exit the thread. This prevents any
// destructors from running on the main thread which might reference thread locals. Destroying
Expand Down
Loading

0 comments on commit 114d5ae

Please sign in to comment.