Skip to content

Commit

Permalink
quic: fixing the disconnect between quiche stream count and Envoy (#1…
Browse files Browse the repository at this point in the history
…8694)

Actually fixing a quic stream limit issue. Also fixing an unrelated bug with clean stream shutdown occasionally causing spurious stream-close writes to a closed connection.

Risk Level: High (changing connection pool limits)
Testing: new integration test
Docs Changes: n/a
Release Notes: n/a
Platform Specific Features: n/a
Fixes #18160

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Oct 26, 2021
1 parent a191aa4 commit 0882d63
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 22 deletions.
8 changes: 8 additions & 0 deletions envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,14 @@ class ConnectionCallbacks {
* @param ReceivedSettings the settings received from the peer.
*/
virtual void onSettings(ReceivedSettings& settings) { UNREFERENCED_PARAMETER(settings); }

/**
* Fires when the MAX_STREAMS frame is received from the peer.
* This is an HTTP/3 frame, indicating the new maximum stream ID which can be opened.
* This may occur multiple times across the lifetime of an HTTP/3 connection.
* @param num_streams the number of streams now allowed to be opened.
*/
virtual void onMaxStreamsChanged(uint32_t num_streams) { UNREFERENCED_PARAMETER(num_streams); }
};

/**
Expand Down
21 changes: 16 additions & 5 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,23 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient&
}
ENVOY_CONN_LOG(debug, "creating stream", client);

// Latch capacity before updating remaining streams.
uint64_t capacity = client.currentUnusedCapacity();
client.remaining_streams_--;
if (client.remaining_streams_ == 0) {
ENVOY_CONN_LOG(debug, "maximum streams per connection, DRAINING", client);
host_->cluster().stats().upstream_cx_max_requests_.inc();
transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::DRAINING);
} else if (client.numActiveStreams() + 1 >= client.concurrent_stream_limit_) {
} else if (capacity == 1) {
// As soon as the new stream is created, the client will be maxed out.
transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::BUSY);
}

// Decrement the capacity, as there's one less stream available for serving.
state_.decrConnectingAndConnectedStreamCapacity(1);
// For HTTP/3, the capacity is updated in newStreamEncoder.
if (trackStreamCapacity()) {
state_.decrConnectingAndConnectedStreamCapacity(1);
}
// Track the new active stream.
state_.incrActiveStreams(1);
num_active_streams_++;
Expand All @@ -213,14 +218,17 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
// If the effective client capacity was limited by concurrency, increase connecting capacity.
// If the effective client capacity was limited by max total streams, this will not result in an
// increment as no capacity is freed up.
if (client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1 ||
had_negative_capacity) {
// We don't update the capacity for HTTP/3 as the stream count should only
// increase when a MAX_STREAMS frame is received.
if (trackStreamCapacity() && (client.remaining_streams_ > client.concurrent_stream_limit_ -
client.numActiveStreams() - 1 ||
had_negative_capacity)) {
state_.incrConnectingAndConnectedStreamCapacity(1);
}
if (client.state() == ActiveClient::State::DRAINING && client.numActiveStreams() == 0) {
// Close out the draining client if we no longer have active streams.
client.close();
} else if (client.state() == ActiveClient::State::BUSY) {
} else if (client.state() == ActiveClient::State::BUSY && client.currentUnusedCapacity() != 0) {
transitionActiveClientState(client, ActiveClient::State::READY);
if (!delay_attaching_stream) {
onUpstreamReady();
Expand Down Expand Up @@ -296,6 +304,9 @@ void ConnPoolImplBase::onUpstreamReady() {
state_.decrPendingStreams(1);
pending_streams_.pop_back();
}
if (!pending_streams_.empty()) {
tryCreateNewConnections();
}
}

std::list<ActiveClientPtr>& ConnPoolImplBase::owningList(ActiveClient::State state) {
Expand Down
19 changes: 16 additions & 3 deletions source/common/conn_pool/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ActiveClient : public LinkedObject<ActiveClient>,
// Returns the application protocol, or absl::nullopt for TCP.
virtual absl::optional<Http::Protocol> protocol() const PURE;

int64_t currentUnusedCapacity() const {
virtual int64_t currentUnusedCapacity() const {
int64_t remaining_concurrent_streams =
static_cast<int64_t>(concurrent_stream_limit_) - numActiveStreams();

Expand Down Expand Up @@ -102,6 +102,11 @@ class ActiveClient : public LinkedObject<ActiveClient>,
virtual void drain();

ConnPoolImplBase& parent_;
// The count of remaining streams allowed for this connection.
// This will start out as the total number of streams per connection if capped
// by configuration, or it will be set to std::numeric_limits<uint32_t>::max() to be
// (functionally) unlimited.
// TODO: this could be moved to an optional to make it actually unlimited.
uint32_t remaining_streams_;
uint32_t concurrent_stream_limit_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
Expand Down Expand Up @@ -148,6 +153,10 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
virtual ~ConnPoolImplBase();

void deleteIsPendingImpl();
// By default, the connection pool will track connected and connecting stream
// capacity as streams are created and destroyed. QUIC does custom stream
// accounting so will override this to false.
virtual bool trackStreamCapacity() { return true; }

// A helper function to get the specific context type from the base class context.
template <class T> T& typedContext(AttachContext& context) {
Expand Down Expand Up @@ -234,6 +243,9 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
void decrClusterStreamCapacity(uint32_t delta) {
state_.decrConnectingAndConnectedStreamCapacity(delta);
}
void incrClusterStreamCapacity(uint32_t delta) {
state_.incrConnectingAndConnectedStreamCapacity(delta);
}
void dumpState(std::ostream& os, int indent_level = 0) const {
const char* spaces = spacesForLevel(indent_level);
os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size())
Expand All @@ -255,6 +267,9 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
connecting_stream_capacity_ -= delta;
}

// Called when an upstream is ready to serve pending streams.
void onUpstreamReady();

protected:
virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {}

Expand All @@ -265,7 +280,6 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
NoConnectionRateLimited,
CreatedButRateLimited,
};

// Creates up to 3 connections, based on the preconnect ratio.
// Returns the ConnectionResult of the last attempt.
ConnectionResult tryCreateNewConnections();
Expand Down Expand Up @@ -342,7 +356,6 @@ class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
// True iff this object is in the deferred delete list.
bool deferred_deleting_{false};

void onUpstreamReady();
Event::SchedulableCallbackPtr upstream_ready_cb_;
};

Expand Down
5 changes: 5 additions & 0 deletions source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ class CodecClient : protected Logger::Loggable<Logger::Id::client>,
codec_callbacks_->onSettings(settings);
}
}
void onMaxStreamsChanged(uint32_t num_streams) override {
if (codec_callbacks_) {
codec_callbacks_->onMaxStreamsChanged(num_streams);
}
}

void onIdleTimeout() {
host_->cluster().stats().upstream_cx_idle_timeout_.inc();
Expand Down
9 changes: 9 additions & 0 deletions source/common/http/http3/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ ActiveClient::ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent,
parent.host()->cluster().stats().upstream_cx_http3_total_, data) {
}

void ActiveClient::onMaxStreamsChanged(uint32_t num_streams) {
updateCapacity(num_streams);
if (state() == ActiveClient::State::BUSY && currentUnusedCapacity() != 0) {
parent_.transitionActiveClientState(*this, ActiveClient::State::READY);
// If there's waiting streams, make sure the pool will now serve them.
parent_.onUpstreamReady();
}
}

void Http3ConnPoolImpl::setQuicConfigFromClusterConfig(const Upstream::ClusterInfo& cluster,
quic::QuicConfig& quic_config) {
Quic::convertQuicConfig(cluster.http3Options().quic_protocol_options(), quic_config);
Expand Down
62 changes: 62 additions & 0 deletions source/common/http/http3/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,65 @@ class ActiveClient : public MultiplexedActiveClientBase {
public:
ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent,
Upstream::Host::CreateConnectionData& data);

// Http::ConnectionCallbacks
void onMaxStreamsChanged(uint32_t num_streams) override;

RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override {
ASSERT(quiche_capacity_ != 0);
// Each time a quic stream is allocated the quic capacity needs to get
// decremented. See comments by quiche_capacity_.
updateCapacity(quiche_capacity_ - 1);
return MultiplexedActiveClientBase::newStreamEncoder(response_decoder);
}

// Overload the default capacity calculations to return the quic capacity
// (modified by any stream limits in Envoy config)
int64_t currentUnusedCapacity() const override {
return std::min<int64_t>(quiche_capacity_, effectiveConcurrentStreamLimit());
}

void updateCapacity(uint64_t new_quiche_capacity) {
// Each time we update the capacity make sure to reflect the update in the
// connection pool.
//
// Due to interplay between the max number of concurrent streams Envoy will
// allow and the max number of streams per connection this is not as simple
// as just updating based on the delta between quiche_capacity_ and
// new_quiche_capacity, so we use the delta between the actual calculated
// capacity before and after the update.
uint64_t old_capacity = currentUnusedCapacity();
quiche_capacity_ = new_quiche_capacity;
uint64_t new_capacity = currentUnusedCapacity();

if (new_capacity < old_capacity) {
parent_.decrClusterStreamCapacity(old_capacity - new_capacity);
} else if (old_capacity < new_capacity) {
parent_.incrClusterStreamCapacity(new_capacity - old_capacity);
}
}

// Unlike HTTP/2 and HTTP/1, rather than having a cap on the number of active
// streams, QUIC has a fixed number of streams available which is updated via
// the MAX_STREAMS frame.
//
// As such each time we create a new stream for QUIC, the capacity goes down
// by one, but unlike the other two codecs it is _not_ restored on stream
// closure.
//
// We track the QUIC capacity here, and overload currentUnusedCapacity so the
// connection pool can accurately keep track of when it is safe to create new
// streams.
//
// Though HTTP/3 should arguably start out with 0 stream capacity until the
// initial handshake is complete and MAX_STREAMS frame has been received,
// assume optimistically it will get ~100 streams, so that the connection pool
// won't fetch a connection for each incoming stream but will assume that the
// first connection will likely be able to serve 100.
// This number will be updated to the correct value before the connection is
// deemed connected, at which point further connections will be established if
// necessary.
uint64_t quiche_capacity_ = 100;
};

// Http3 subclass of FixedHttpConnPoolImpl which exists to store quic data.
Expand All @@ -45,6 +104,9 @@ class Http3ConnPoolImpl : public FixedHttpConnPoolImpl {
quic::QuicConfig& quic_config);

Quic::PersistentQuicInfoImpl& quicInfo() { return *quic_info_; }
// For HTTP/3 the base connection pool does not track stream capacity, rather
// the HTTP3 active client does.
bool trackStreamCapacity() override { return false; }

private:
// Store quic helpers which can be shared between connections and must live
Expand Down
40 changes: 39 additions & 1 deletion source/common/quic/envoy_quic_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@

#include "quic_filter_manager_connection_impl.h"

namespace quic {
namespace test {

// TODO(alyssawilk) add the necessary accessors to quiche and remove this.
class QuicSessionPeer {
public:
static quic::QuicStreamIdManager&
getStreamIdManager(Envoy::Quic::EnvoyQuicClientSession* session) {
return session->ietf_streamid_manager_.bidirectional_stream_id_manager_;
}
};

} // namespace test
} // namespace quic

namespace Envoy {
namespace Quic {

Expand Down Expand Up @@ -81,6 +96,16 @@ void EnvoyQuicClientSession::OnRstStream(const quic::QuicRstStreamFrame& frame)
/*from_self*/ false, /*is_upstream*/ true);
}

void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool unidirectional) {
if (!http_connection_callbacks_ || unidirectional) {
return;
}
uint32_t streams_available = streamsAvailable();
if (streams_available > 0) {
http_connection_callbacks_->onMaxStreamsChanged(streams_available);
}
}

std::unique_ptr<quic::QuicSpdyClientStream> EnvoyQuicClientSession::CreateClientStream() {
ASSERT(codec_stats_.has_value() && http3_options_.has_value());
return std::make_unique<EnvoyQuicClientStream>(GetNextOutgoingBidirectionalStreamId(), this,
Expand Down Expand Up @@ -109,9 +134,22 @@ quic::QuicConnection* EnvoyQuicClientSession::quicConnection() {
return initialized_ ? connection() : nullptr;
}

uint64_t EnvoyQuicClientSession::streamsAvailable() {
quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStreamIdManager(this);
ASSERT(manager.outgoing_max_streams() >= manager.outgoing_stream_count());
uint32_t streams_available = manager.outgoing_max_streams() - manager.outgoing_stream_count();
return streams_available;
}

void EnvoyQuicClientSession::OnTlsHandshakeComplete() {
quic::QuicSpdyClientSession::OnTlsHandshakeComplete();
raiseConnectionEvent(Network::ConnectionEvent::Connected);

// TODO(alyssawilk) support the case where a connection starts with 0 max streams.
ASSERT(streamsAvailable());
if (streamsAvailable() > 0) {
OnCanCreateNewOutgoingStream(false);
raiseConnectionEvent(Network::ConnectionEvent::Connected);
}
}

std::unique_ptr<quic::QuicCryptoClientStreamBase> EnvoyQuicClientSession::CreateQuicCryptoStream() {
Expand Down
8 changes: 8 additions & 0 deletions source/common/quic/envoy_quic_client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
namespace Envoy {
namespace Quic {

class EnvoyQuicClientSession;

// Act as a Network::ClientConnection to ClientCodec.
// TODO(danzh) This class doesn't need to inherit Network::FilterManager
// interface but need all other Network::Connection implementation in
Expand Down Expand Up @@ -58,6 +60,7 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl,
void MaybeSendRstStreamFrame(quic::QuicStreamId id, quic::QuicResetStreamError error,
quic::QuicStreamOffset bytes_written) override;
void OnRstStream(const quic::QuicRstStreamFrame& frame) override;

// quic::QuicSpdyClientSessionBase
bool ShouldKeepConnectionAlive() const override;
// quic::ProofHandler
Expand All @@ -73,6 +76,9 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl,
// QuicFilterManagerConnectionImpl
void setHttp3Options(const envoy::config::core::v3::Http3ProtocolOptions& http3_options) override;

// Notify any registered connection pool when new streams are available.
void OnCanCreateNewOutgoingStream(bool) override;

using quic::QuicSpdyClientSession::PerformActionOnActiveStreams;

protected:
Expand All @@ -95,6 +101,8 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl,
quic::QuicConnection* quicConnection() override;

private:
uint64_t streamsAvailable();

// These callbacks are owned by network filters and quic session should outlive
// them.
Http::ConnectionCallbacks* http_connection_callbacks_{nullptr};
Expand Down
4 changes: 3 additions & 1 deletion source/common/quic/envoy_quic_client_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ void EnvoyQuicClientStream::ResetWithError(quic::QuicResetStreamError error) {
stats_.tx_reset_.inc();
// Upper layers expect calling resetStream() to immediately raise reset callbacks.
runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error.internal_code()));
quic::QuicSpdyClientStream::ResetWithError(error);
if (session()->connection()->connected()) {
quic::QuicSpdyClientStream::ResetWithError(error);
}
}

void EnvoyQuicClientStream::OnConnectionClosed(quic::QuicErrorCode error,
Expand Down
3 changes: 3 additions & 0 deletions test/integration/base_integration_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ class BaseIntegrationTest : protected Logger::Loggable<Logger::Id::testing> {
void mergeOptions(envoy::config::core::v3::Http2ProtocolOptions& options) {
upstream_config_.http2_options_.MergeFrom(options);
}
void mergeOptions(envoy::config::listener::v3::QuicProtocolOptions& options) {
upstream_config_.quic_options_.MergeFrom(options);
}

std::unique_ptr<Stats::Scope> upstream_stats_store_;

Expand Down
2 changes: 1 addition & 1 deletion test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket
FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket_factory,
Network::SocketPtr&& listen_socket, const FakeUpstreamConfig& config)
: http_type_(config.upstream_protocol_), http2_options_(config.http2_options_),
http3_options_(config.http3_options_),
http3_options_(config.http3_options_), quic_options_(config.quic_options_),
socket_(Network::SocketSharedPtr(listen_socket.release())),
socket_factory_(std::make_unique<FakeListenSocketFactory>(socket_)),
api_(Api::createApiForTest(stats_store_)), time_system_(config.time_system_),
Expand Down
4 changes: 3 additions & 1 deletion test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ struct FakeUpstreamConfig {
absl::optional<UdpConfig> udp_fake_upstream_;
envoy::config::core::v3::Http2ProtocolOptions http2_options_;
envoy::config::core::v3::Http3ProtocolOptions http3_options_;
envoy::config::listener::v3::QuicProtocolOptions quic_options_;
uint32_t max_request_headers_kb_ = Http::DEFAULT_MAX_REQUEST_HEADERS_KB;
uint32_t max_request_headers_count_ = Http::DEFAULT_MAX_HEADERS_COUNT;
envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
Expand Down Expand Up @@ -760,7 +761,7 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
if (is_quic) {
#if defined(ENVOY_ENABLE_QUIC)
udp_listener_config_.listener_factory_ = std::make_unique<Quic::ActiveQuicListenerFactory>(
envoy::config::listener::v3::QuicProtocolOptions(), 1, parent_.quic_stat_names_);
parent_.quic_options_, 1, parent_.quic_stat_names_);
// Initialize QUICHE flags.
quiche::FlagRegistry::getInstance();
#else
Expand Down Expand Up @@ -823,6 +824,7 @@ class FakeUpstream : Logger::Loggable<Logger::Id::testing>,

const envoy::config::core::v3::Http2ProtocolOptions http2_options_;
const envoy::config::core::v3::Http3ProtocolOptions http3_options_;
envoy::config::listener::v3::QuicProtocolOptions quic_options_;
Network::SocketSharedPtr socket_;
Network::ListenSocketFactoryPtr socket_factory_;
ConditionalInitializer server_initialized_;
Expand Down
Loading

0 comments on commit 0882d63

Please sign in to comment.