diff --git a/source/common/http/conn_pool_grid.cc b/source/common/http/conn_pool_grid.cc index 5a3fdef5b503..995e0dc0c7f7 100644 --- a/source/common/http/conn_pool_grid.cc +++ b/source/common/http/conn_pool_grid.cc @@ -6,15 +6,17 @@ namespace Envoy { namespace Http { +namespace { absl::string_view describePool(const ConnectionPool::Instance& pool) { return pool.protocolDescription(); } +} // namespace ConnectivityGrid::WrapperCallbacks::WrapperCallbacks(ConnectivityGrid& grid, Http::ResponseDecoder& decoder, PoolIterator pool_it, ConnectionPool::Callbacks& callbacks) - : grid_(grid), decoder_(decoder), inner_callbacks_(callbacks), + : grid_(grid), decoder_(decoder), inner_callbacks_(&callbacks), next_attempt_timer_( grid_.dispatcher_.createTimer([this]() -> void { tryAnotherConnection(); })), current_(pool_it) {} @@ -24,6 +26,12 @@ ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::ConnectionAttemp WrapperCallbacks& parent, PoolIterator it) : parent_(parent), pool_it_(it), cancellable_(nullptr) {} +ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::~ConnectionAttemptCallbacks() { + if (cancellable_ != nullptr) { + cancel(Envoy::ConnectionPool::CancelPolicy::Default); + } +} + ConnectivityGrid::StreamCreationResult ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::newStream() { auto* cancellable = pool().newStream(parent_.decoder_, *this); @@ -37,14 +45,21 @@ ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::newStream() { void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::onPoolFailure( ConnectionPool::PoolFailureReason reason, absl::string_view transport_failure_reason, Upstream::HostDescriptionConstSharedPtr host) { + cancellable_ = nullptr; // Attempt failed and can no longer be cancelled. parent_.onConnectionAttemptFailed(this, reason, transport_failure_reason, host); } void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptFailed( ConnectionAttemptCallbacks* attempt, ConnectionPool::PoolFailureReason reason, absl::string_view transport_failure_reason, Upstream::HostDescriptionConstSharedPtr host) { + ASSERT(host == grid_.host_); ENVOY_LOG(trace, "{} pool failed to create connection to host '{}'.", - describePool(attempt->pool()), grid_.host_->hostname()); + describePool(attempt->pool()), host->hostname()); + if (grid_.isPoolHttp3(attempt->pool())) { + http3_attempt_failed_ = true; + } + maybeMarkHttp3Broken(); + auto delete_this_on_return = attempt->removeFromList(connection_attempts_); // If there is another connection attempt in flight then let that proceed. @@ -58,12 +73,15 @@ void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptFailed( } // If this point is reached, all pools have been tried. Pass the pool failure up to the - // original caller. - ConnectionPool::Callbacks& callbacks = inner_callbacks_; - ENVOY_LOG(trace, "Passing pool failure up to caller.", describePool(attempt->pool()), - grid_.host_->hostname()); + // original caller, if the caller hasn't already been notified. + ConnectionPool::Callbacks* callbacks = inner_callbacks_; + inner_callbacks_ = nullptr; deleteThis(); - callbacks.onPoolFailure(reason, transport_failure_reason, host); + if (callbacks != nullptr) { + ENVOY_LOG(trace, "Passing pool failure up to caller.", describePool(attempt->pool()), + host->hostname()); + callbacks->onPoolFailure(reason, transport_failure_reason, host); + } } void ConnectivityGrid::WrapperCallbacks::deleteThis() { @@ -87,37 +105,65 @@ void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptReady( ConnectionAttemptCallbacks* attempt, RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, absl::optional protocol) { + ASSERT(host == grid_.host_); ENVOY_LOG(trace, "{} pool successfully connected to host '{}'.", describePool(attempt->pool()), - grid_.host_->hostname()); - auto delete_on_return = attempt->removeFromList(connection_attempts_); - // The first successful connection is passed up, and all others will be canceled. - // TODO: Ensure that if HTTP/2 succeeds, we can allow the HTTP/3 connection to run to completion. - for (auto& attempt : connection_attempts_) { - attempt->cancel(Envoy::ConnectionPool::CancelPolicy::Default); + host->hostname()); + if (!grid_.isPoolHttp3(attempt->pool())) { + tcp_attempt_succeeded_ = true; + } + maybeMarkHttp3Broken(); + + auto delete_this_on_return = attempt->removeFromList(connection_attempts_); + ConnectionPool::Callbacks* callbacks = inner_callbacks_; + inner_callbacks_ = nullptr; + // If an HTTP/3 connection attempts is in progress, let it complete so that if it succeeds + // it can be used for future requests. But if there is a TCP connection attempt in progress, + // cancel it. + if (grid_.isPoolHttp3(attempt->pool())) { + cancelAllPendingAttempts(Envoy::ConnectionPool::CancelPolicy::Default); + } + if (connection_attempts_.empty()) { + deleteThis(); + } + if (callbacks != nullptr) { + callbacks->onPoolReady(encoder, host, info, protocol); + } +} + +void ConnectivityGrid::WrapperCallbacks::maybeMarkHttp3Broken() { + if (http3_attempt_failed_ && tcp_attempt_succeeded_) { + ENVOY_LOG(trace, "Marking HTTP/3 broken for host '{}'.", grid_.host_->hostname()); + grid_.setIsHttp3Broken(true); } - ConnectionPool::Callbacks& callbacks = inner_callbacks_; - deleteThis(); - return callbacks.onPoolReady(encoder, host, info, protocol); } void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::onPoolReady( RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, absl::optional protocol) { + cancellable_ = nullptr; // Attempt succeeded and can no longer be cancelled. parent_.onConnectionAttemptReady(this, encoder, host, info, protocol); } void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::cancel( Envoy::ConnectionPool::CancelPolicy cancel_policy) { - cancellable_->cancel(cancel_policy); + auto cancellable = cancellable_; + cancellable_ = nullptr; // Prevent repeated cancellations. + cancellable->cancel(cancel_policy); } void ConnectivityGrid::WrapperCallbacks::cancel(Envoy::ConnectionPool::CancelPolicy cancel_policy) { // If the newStream caller cancels the stream request, pass the cancellation on // to each connection attempt. + cancelAllPendingAttempts(cancel_policy); + deleteThis(); +} + +void ConnectivityGrid::WrapperCallbacks::cancelAllPendingAttempts( + Envoy::ConnectionPool::CancelPolicy cancel_policy) { for (auto& attempt : connection_attempts_) { attempt->cancel(cancel_policy); } - deleteThis(); + connection_attempts_.clear(); } bool ConnectivityGrid::WrapperCallbacks::tryAnotherConnection() { @@ -154,13 +200,16 @@ ConnectivityGrid::ConnectivityGrid( ConnectivityGrid::~ConnectivityGrid() { // Ignore drained callbacks while the pools are destroyed below. destroying_ = true; + // Callbacks might have pending streams registered with the pools, so cancel and delete + // the callback before deleting the pools. + wrapped_callbacks_.clear(); pools_.clear(); } absl::optional ConnectivityGrid::createNextPool() { // Pools are created by newStream, which should not be called during draining. ASSERT(drained_callbacks_.empty()); - // Right now, only H3 and ALPN are supported, so if there are 2 pools we're done. + // Right now, only H3 and TCP are supported, so if there are 2 pools we're done. if (pools_.size() == 2 || !drained_callbacks_.empty()) { return absl::nullopt; } @@ -193,17 +242,22 @@ ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder& if (pools_.empty()) { createNextPool(); } - - // TODO(#15649) track pools with successful connections: don't always start at - // the front of the list. - auto wrapped_callback = - std::make_unique(*this, decoder, pools_.begin(), callbacks); + PoolIterator pool = pools_.begin(); + if (is_http3_broken_) { + ENVOY_LOG(trace, "HTTP/3 is broken to host '{}', skipping.", describePool(**pool), + host_->hostname()); + // Since HTTP/3 is broken, presumably both pools have already been created so this + // is just to be safe. + createNextPool(); + ++pool; + } + auto wrapped_callback = std::make_unique(*this, decoder, pool, callbacks); ConnectionPool::Cancellable* ret = wrapped_callback.get(); LinkedList::moveIntoList(std::move(wrapped_callback), wrapped_callbacks_); - // Note that in the case of immediate attempt/failure, newStream will delete this. if (wrapped_callbacks_.front()->newStream() == StreamCreationResult::ImmediateResult) { // If newStream succeeds, return nullptr as the caller has received their - // callback and does not need a cancellable handle. + // callback and does not need a cancellable handle. At this point the + // WrappedCallbacks object has also been deleted. return nullptr; } return ret; @@ -248,6 +302,10 @@ absl::optional ConnectivityGrid::nextPool(PoolIt return createNextPool(); } +bool ConnectivityGrid::isPoolHttp3(const ConnectionPool::Instance& pool) { + return &pool == pools_.begin()->get(); +} + void ConnectivityGrid::onDrainReceived() { // Don't do any work under the stack of ~ConnectivityGrid() if (destroying_) { diff --git a/source/common/http/conn_pool_grid.h b/source/common/http/conn_pool_grid.h index bd1989f81e6e..d4d2ae2fc92d 100644 --- a/source/common/http/conn_pool_grid.h +++ b/source/common/http/conn_pool_grid.h @@ -42,6 +42,7 @@ class ConnectivityGrid : public ConnectionPool::Instance, public LinkedObject { public: ConnectionAttemptCallbacks(WrapperCallbacks& parent, PoolIterator it); + ~ConnectionAttemptCallbacks() override; StreamCreationResult newStream(); @@ -74,9 +75,6 @@ class ConnectivityGrid : public ConnectionPool::Instance, // Attempt to create a new stream for pool(). StreamCreationResult newStream(); - // Removes this from the owning list, deleting it. - void deleteThis(); - // Called on pool failure or timeout to kick off another connection attempt. // Returns true if there is a failover pool and a connection has been // attempted, false if all pools have been tried. @@ -95,6 +93,16 @@ class ConnectivityGrid : public ConnectionPool::Instance, absl::optional protocol); private: + // Removes this from the owning list, deleting it. + void deleteThis(); + + // Marks HTTP/3 broken if the HTTP/3 attempt failed but a TCP attempt succeeded. + // While HTTP/3 is broken the grid will not attempt to make new HTTP/3 connections. + void maybeMarkHttp3Broken(); + + // Cancels any pending attempts and deletes them. + void cancelAllPendingAttempts(Envoy::ConnectionPool::CancelPolicy cancel_policy); + // Tracks all the connection attempts which currently in flight. std::list connection_attempts_; @@ -103,12 +111,17 @@ class ConnectivityGrid : public ConnectionPool::Instance, // The decoder for the original newStream, needed to create streams on subsequent pools. Http::ResponseDecoder& decoder_; // The callbacks from the original caller, which must get onPoolFailure or - // onPoolReady unless there is call to cancel(). - ConnectionPool::Callbacks& inner_callbacks_; + // onPoolReady unless there is call to cancel(). Will be nullptr if the caller + // has been notified while attempts are still pending. + ConnectionPool::Callbacks* inner_callbacks_; // The timer which tracks when new connections should be attempted. Event::TimerPtr next_attempt_timer_; // The iterator to the last pool which had a connection attempt. PoolIterator current_; + // True if the HTTP/3 attempt failed. + bool http3_attempt_failed_{}; + // True if the TCP attempt succeeded. + bool tcp_attempt_succeeded_{}; }; using WrapperCallbacksPtr = std::unique_ptr; @@ -134,6 +147,12 @@ class ConnectivityGrid : public ConnectionPool::Instance, // Returns the next pool in the ordered priority list. absl::optional nextPool(PoolIterator pool_it); + // Returns true if pool is the grid's HTTP/3 connection pool. + bool isPoolHttp3(const ConnectionPool::Instance& pool); + + bool isHttp3Broken() const { return is_http3_broken_; } + void setIsHttp3Broken(bool is_http3_broken) { is_http3_broken_ = is_http3_broken; } + private: friend class ConnectivityGridForTest; @@ -155,6 +174,7 @@ class ConnectivityGrid : public ConnectionPool::Instance, Upstream::ClusterConnectivityState& state_; std::chrono::milliseconds next_attempt_duration_; TimeSource& time_source_; + bool is_http3_broken_{}; // Tracks how many drains are needed before calling drain callbacks. This is // set to the number of pools when the first drain callbacks are added, and diff --git a/test/common/http/conn_pool_grid_test.cc b/test/common/http/conn_pool_grid_test.cc index 27426a6a9381..0406db0f28e7 100644 --- a/test/common/http/conn_pool_grid_test.cc +++ b/test/common/http/conn_pool_grid_test.cc @@ -46,14 +46,14 @@ class ConnectivityGridForTest : public ConnectivityGrid { if (immediate_success_) { callbacks.onPoolReady(*encoder_, host(), *info_, absl::nullopt); return nullptr; - } else if (immediate_failure_) { + } + if (immediate_failure_) { callbacks.onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure, "reason", host()); return nullptr; - } else { - callbacks_.push_back(&callbacks); - return &cancel_; } + callbacks_.push_back(&callbacks); + return &cancel_; })); if (pools_.size() == 1) { EXPECT_CALL(*first(), protocolDescription()) @@ -97,11 +97,11 @@ class ConnectivityGridTest : public Event::TestUsingSimulatedTime, public testin public: ConnectivityGridTest() : options_({Http::Protocol::Http11, Http::Protocol::Http2, Http::Protocol::Http3}), - host_(new NiceMock()), grid_(dispatcher_, random_, Upstream::makeTestHost(cluster_, "hostname", "tcp://127.0.0.1:9000", simTime()), Upstream::ResourcePriority::Default, socket_options_, transport_socket_options_, - state_, simTime(), std::chrono::milliseconds(300), options_) { + state_, simTime(), std::chrono::milliseconds(300), options_), + host_(grid_.host()) { grid_.info_ = &info_; grid_.encoder_ = &encoder_; } @@ -109,12 +109,12 @@ class ConnectivityGridTest : public Event::TestUsingSimulatedTime, public testin const Network::ConnectionSocket::OptionsSharedPtr socket_options_; const Network::TransportSocketOptionsSharedPtr transport_socket_options_; ConnectivityGrid::ConnectivityOptions options_; - Upstream::HostDescriptionConstSharedPtr host_; Upstream::ClusterConnectivityState state_; NiceMock dispatcher_; std::shared_ptr cluster_{new NiceMock()}; NiceMock random_; ConnectivityGridForTest grid_; + Upstream::HostDescriptionConstSharedPtr host_; NiceMock callbacks_; NiceMock decoder_; @@ -134,6 +134,7 @@ TEST_F(ConnectivityGridTest, Success) { ASSERT_NE(grid_.callbacks(), nullptr); EXPECT_CALL(callbacks_.pool_ready_, ready()); grid_.callbacks()->onPoolReady(encoder_, host_, info_, absl::nullopt); + EXPECT_FALSE(grid_.isHttp3Broken()); } // Test the first pool successfully connecting under the stack of newStream. @@ -143,6 +144,7 @@ TEST_F(ConnectivityGridTest, ImmediateSuccess) { EXPECT_CALL(callbacks_.pool_ready_, ready()); EXPECT_EQ(grid_.newStream(decoder_, callbacks_), nullptr); EXPECT_NE(grid_.first(), nullptr); + EXPECT_FALSE(grid_.isHttp3Broken()); } // Test the first pool failing and the second connecting. @@ -171,6 +173,7 @@ TEST_F(ConnectivityGridTest, FailureThenSuccessSerial) { EXPECT_CALL(callbacks_.pool_ready_, ready()); EXPECT_LOG_CONTAINS("trace", "second pool successfully connected to host 'hostname'", grid_.callbacks(1)->onPoolReady(encoder_, host_, info_, absl::nullopt)); + EXPECT_TRUE(grid_.isHttp3Broken()); } // Test both connections happening in parallel and the second connecting. @@ -200,6 +203,7 @@ TEST_F(ConnectivityGridTest, TimeoutThenSuccessParallelSecondConnects) { EXPECT_NE(grid_.callbacks(), nullptr); EXPECT_CALL(callbacks_.pool_ready_, ready()); grid_.callbacks(1)->onPoolReady(encoder_, host_, info_, absl::nullopt); + EXPECT_TRUE(grid_.isHttp3Broken()); } // Test both connections happening in parallel and the first connecting. @@ -227,6 +231,38 @@ TEST_F(ConnectivityGridTest, TimeoutThenSuccessParallelFirstConnects) { EXPECT_NE(grid_.callbacks(0), nullptr); EXPECT_CALL(callbacks_.pool_ready_, ready()); grid_.callbacks(0)->onPoolReady(encoder_, host_, info_, absl::nullopt); + EXPECT_FALSE(grid_.isHttp3Broken()); +} + +// Test both connections happening in parallel and the second connecting before +// the first eventually fails. +TEST_F(ConnectivityGridTest, TimeoutThenSuccessParallelSecondConnectsFirstFail) { + EXPECT_EQ(grid_.first(), nullptr); + + // This timer will be returned and armed as the grid creates the wrapper's failover timer. + Event::MockTimer* failover_timer = new NiceMock(&dispatcher_); + + grid_.newStream(decoder_, callbacks_); + EXPECT_NE(grid_.first(), nullptr); + EXPECT_TRUE(failover_timer->enabled_); + + // Kick off the second connection. + failover_timer->invokeCallback(); + EXPECT_NE(grid_.second(), nullptr); + + // onPoolReady should be passed from the pool back to the original caller. + EXPECT_NE(grid_.callbacks(1), nullptr); + EXPECT_CALL(callbacks_.pool_ready_, ready()); + grid_.callbacks(1)->onPoolReady(encoder_, host_, info_, absl::nullopt); + EXPECT_FALSE(grid_.isHttp3Broken()); + + // onPoolFailure should not be passed up the first time. Instead the grid + // should wait on the other pool + EXPECT_NE(grid_.callbacks(0), nullptr); + EXPECT_CALL(callbacks_.pool_failure_, ready()).Times(0); + grid_.callbacks(0)->onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure, + "reason", host_); + EXPECT_TRUE(grid_.isHttp3Broken()); } // Test that after the first pool fails, subsequent connections will @@ -260,6 +296,7 @@ TEST_F(ConnectivityGridTest, ImmediateDoubleFailure) { grid_.immediate_failure_ = true; EXPECT_CALL(callbacks_.pool_failure_, ready()); EXPECT_EQ(grid_.newStream(decoder_, callbacks_), nullptr); + EXPECT_FALSE(grid_.isHttp3Broken()); } // Test both connections happening in parallel and both failing. @@ -287,6 +324,7 @@ TEST_F(ConnectivityGridTest, TimeoutDoubleFailureParallel) { EXPECT_CALL(callbacks_.pool_failure_, ready()); grid_.callbacks(1)->onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure, "reason", host_); + EXPECT_FALSE(grid_.isHttp3Broken()); } // Test cancellation @@ -384,6 +422,23 @@ TEST_F(ConnectivityGridTest, NoDrainOnTeardown) { EXPECT_FALSE(drain_received); } +// Test that when HTTP/3 is broken then the HTTP/3 pool is skipped. +TEST_F(ConnectivityGridTest, SuccessAfterBroken) { + grid_.setIsHttp3Broken(true); + EXPECT_EQ(grid_.first(), nullptr); + + EXPECT_LOG_CONTAINS("trace", "HTTP/3 is broken to host 'first', skipping.", + EXPECT_NE(grid_.newStream(decoder_, callbacks_), nullptr)); + EXPECT_NE(grid_.first(), nullptr); + EXPECT_NE(grid_.second(), nullptr); + + // onPoolReady should be passed from the pool back to the original caller. + ASSERT_NE(grid_.callbacks(), nullptr); + EXPECT_CALL(callbacks_.pool_ready_, ready()); + grid_.callbacks()->onPoolReady(encoder_, host_, info_, absl::nullopt); + EXPECT_TRUE(grid_.isHttp3Broken()); +} + #ifdef ENVOY_ENABLE_QUICHE } // namespace