diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 278f0d325f87..6856bf482532 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -5,6 +5,7 @@ Version history ================ Changes ------- +* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers. * udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash. 1.13.6 (September 29, 2020) diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 5e9b57a899d8..4d2a00d382af 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -51,7 +51,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt [this]() -> void { this->onHighWatermark(); })), read_enabled_(true), above_high_watermark_(false), detect_early_close_(true), enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false), - write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) { + write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false), + transport_wants_read_(false) { // Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM // condition and just crash. RELEASE_ASSERT(ioHandle().fd() != -1, ""); @@ -339,7 +340,13 @@ void ConnectionImpl::readDisable(bool disable) { // If the connection has data buffered there's no guarantee there's also data in the kernel // which will kick off the filter chain. Instead fake an event to make sure the buffered data // gets processed regardless and ensure that we dispatch it via onRead. - if (read_buffer_.length() > 0) { + if (read_buffer_.length() > 0 || transport_wants_read_) { + // If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be + // able to process additional bytes even if there is no data in the kernel to kick off the + // filter chain. Alternately if the read buffer has data the fd could be read disabled. To + // handle these cases, fake an event to make sure the buffered data in the read buffer or in + // transport socket internal buffers gets processed regardless and ensure that we dispatch it + // via onRead. dispatch_buffered_data_ = true; file_event_->activate(Event::FileReadyType::Read); } @@ -509,9 +516,15 @@ void ConnectionImpl::onFileEvent(uint32_t events) { void ConnectionImpl::onReadReady() { ENVOY_CONN_LOG(trace, "read ready", *this); + ASSERT(read_enabled_); ASSERT(!connecting_); + // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that + // the transport socket read resumption happens as requested; onReadReady() returns early without + // reading from the transport if the read buffer is above high watermark at the start of the + // method. + transport_wants_read_ = false; IoResult result = transport_socket_->doRead(read_buffer_); uint64_t new_buffer_size = read_buffer_.length(); updateReadBufferStats(result.bytes_processed_, new_buffer_size); diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 90a114b3185f..bf7d8a9c8583 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -111,7 +111,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback // TODO(htuch): While this is the basis for also yielding to other connections to provide some // fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees. // Reconsider how to make fairness happen. - void setReadBufferReady() override { file_event_->activate(Event::FileReadyType::Read); } + void setReadBufferReady() override { + transport_wants_read_ = true; + file_event_->activate(Event::FileReadyType::Read); + } // Obtain global next connection ID. This should only be used in tests. static uint64_t nextGlobalIdForTest() { return next_global_id_; } @@ -178,6 +181,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback bool write_end_stream_ : 1; bool current_write_end_stream_ : 1; bool dispatch_buffered_data_ : 1; + // True if the most recent call to the transport socket's doRead method invoked setReadBufferReady + // to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true, + // readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read + // resumption happens when remaining bytes are held in transport socket internal buffers. + bool transport_wants_read_ : 1; }; /** diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index b13f057cc577..9351311656ee 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1506,6 +1506,61 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) { file_ready_cb_(Event::FileReadyType::Read); } +// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is +// re-enabled. +TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) { + InSequence s; + + std::shared_ptr read_filter(new StrictMock()); + connection_->enableHalfClose(true); + connection_->addReadFilter(read_filter); + + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + // No calls to activate when re-enabling if there are no pending read requests. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); + + // setReadBufferReady triggers an immediate call to activate. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->setReadBufferReady(); + + // When processing a sequence of read disable/read enable, changes to the enabled event mask + // happen only when the disable count transitions to/from 0. + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write)); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(true); + connection_->readDisable(false); + connection_->readDisable(false); + EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write)); + // Expect a read activation since there have been no transport doRead calls since the call to + // setReadBufferReady. + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Disable read. + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + + // Expect a read activate when re-enabling since the file ready cb has not done a read. + EXPECT_CALL(*file_event_, setEnabled(_)); + EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read)); + connection_->readDisable(false); + + // Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled. + EXPECT_CALL(*transport_socket_, doRead(_)) + .WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false})); + file_ready_cb_(Event::FileReadyType::Read); + EXPECT_CALL(*file_event_, setEnabled(_)); + connection_->readDisable(true); + EXPECT_CALL(*file_event_, setEnabled(_)); + // No read activate call. + EXPECT_CALL(*file_event_, activate(_)).Times(0); + connection_->readDisable(false); +} + // Test that BytesSentCb is invoked at the correct times TEST_F(MockTransportConnectionImplTest, BytesSentCallback) { uint64_t bytes_sent = 0; diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc index 6807251c837d..fe1d9ba29aa6 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.cc @@ -19,6 +19,7 @@ #include "extensions/transport_sockets/tls/context_config_impl.h" #include "extensions/transport_sockets/tls/context_manager_impl.h" +#include "test/integration/autonomous_upstream.h" #include "test/integration/integration.h" #include "test/integration/utility.h" #include "test/test_common/network_utility.h" @@ -177,6 +178,103 @@ TEST_P(SslIntegrationTest, AdminCertEndpoint) { EXPECT_EQ("200", response->headers().Status()->value().getStringView()); } +class RawWriteSslIntegrationTest : public SslIntegrationTest { +protected: + std::unique_ptr + testFragmentedRequestWithBufferLimit(std::list request_chunks, + uint32_t buffer_limit) { + autonomous_upstream_ = true; + config_helper_.setBufferLimits(buffer_limit, buffer_limit); + initialize(); + + // write_request_cb will write each of the items in request_chunks as a separate SSL_write. + auto write_request_cb = [&request_chunks](Network::ClientConnection& client) { + if (!request_chunks.empty()) { + Buffer::OwnedImpl buffer(request_chunks.front()); + client.write(buffer, false); + request_chunks.pop_front(); + } + }; + + auto client_transport_socket_factory_ptr = + createClientSslTransportSocketFactory({}, *context_manager_, *api_); + std::string response; + auto connection = createConnectionDriver( + lookupPort("http"), write_request_cb, + [&](Network::ClientConnection&, const Buffer::Instance& data) -> void { + response.append(data.toString()); + }, + client_transport_socket_factory_ptr->createTransportSocket({})); + + // Drive the connection until we get a response. + while (response.empty()) { + connection->run(Event::Dispatcher::RunType::NonBlock); + } + EXPECT_THAT(response, testing::HasSubstr("HTTP/1.1 200 OK\r\n")); + + connection->close(); + return reinterpret_cast(fake_upstreams_.front().get()) + ->lastRequestHeaders(); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, RawWriteSslIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcessingHeaders) { + // The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were + // picked such that the connection's high watermark will trigger while processing the last SSL + // record containing the request headers. Verify that read resumption works correctly after + // hitting the receive buffer high watermark. + std::list request_chunks = { + "GET / HTTP/1.1\r\nHost: host\r\n", + "key1:" + std::string(14000, 'a') + "\r\n", + "key2:" + std::string(16000, 'b') + "\r\n\r\n", + }; + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); + EXPECT_EQ(upstream_headers->Host()->value(), "host"); + EXPECT_EQ(std::string(14000, 'a'), + upstream_headers->get(Envoy::Http::LowerCaseString("key1"))->value().getStringView()); + EXPECT_EQ(std::string(16000, 'b'), + upstream_headers->get(Envoy::Http::LowerCaseString("key2"))->value().getStringView()); +} + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingBody) { + // The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were + // picked such that the connection's high watermark will trigger while processing the last SSL + // record containing the POST body. Verify that read resumption works correctly after hitting the + // receive buffer high watermark. + std::list request_chunks = { + "POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 30000\r\n\r\n", + std::string(14000, 'a'), + std::string(16000, 'a'), + }; + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); +} + +// Regression test for https://github.com/envoyproxy/envoy/issues/12304 +TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingLargerBody) { + std::list request_chunks = { + "POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 150000\r\n\r\n", + }; + for (int i = 0; i < 10; ++i) { + request_chunks.push_back(std::string(15000, 'a')); + } + + std::unique_ptr upstream_headers = + testFragmentedRequestWithBufferLimit(request_chunks, 16 * 1024); + ASSERT_TRUE(upstream_headers != nullptr); +} + // Validate certificate selection across different certificate types and client TLS versions. class SslCertficateIntegrationTest : public testing::TestWithParam< diff --git a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h index 133e73bd433e..5af886e851bf 100644 --- a/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h +++ b/test/extensions/transport_sockets/tls/integration/ssl_integration_test.h @@ -36,8 +36,6 @@ class SslIntegrationTestBase : public HttpIntegrationTest { // Set this true to debug SSL handshake issues with openssl s_client. The // verbose trace will be in the logs, openssl must be installed separately. bool debug_with_s_client_{false}; - -private: std::unique_ptr context_manager_; }; diff --git a/test/integration/integration.h b/test/integration/integration.h index 725e87655fdd..4ed1fd51dc8e 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -346,6 +346,22 @@ class BaseIntegrationTest : protected Logger::Loggable { void sendRawHttpAndWaitForResponse(int port, const char* raw_http, std::string* response, bool disconnect_after_headers_complete = false); + /** + * Helper to create ConnectionDriver. + * + * @param port the port to connect to. + * @param write_request_cb callback used to send data. + * @param data_callback the callback on the received data. + * @param transport_socket transport socket to use for the client connection + **/ + std::unique_ptr createConnectionDriver( + uint32_t port, RawConnectionDriver::DoWriteCallback write_request_cb, + std::function&& data_callback, + Network::TransportSocketPtr transport_socket) { + return std::make_unique(port, write_request_cb, data_callback, version_, + std::move(transport_socket)); + } + protected: // Create the envoy server in another thread and start it. // Will not return until that server is listening. diff --git a/test/integration/utility.cc b/test/integration/utility.cc index ca1f05955b02..76e152169685 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -118,7 +118,7 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia api_ = Api::createApiForTest(stats_store_); Event::GlobalTimeSystem time_system; dispatcher_ = api_->allocateDispatcher(); - callbacks_ = std::make_unique(); + callbacks_ = std::make_unique([]() {}); client_ = dispatcher_->createClientConnection( Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)), @@ -129,6 +129,30 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia client_->connect(); } +RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, + ReadCallback response_data_callback, + Network::Address::IpVersion version, + Network::TransportSocketPtr transport_socket) { + api_ = Api::createApiForTest(stats_store_); + Event::GlobalTimeSystem time_system; + dispatcher_ = api_->allocateDispatcher(); + callbacks_ = std::make_unique( + [this, write_request_callback]() { write_request_callback(*client_); }); + client_ = dispatcher_->createClientConnection( + Network::Utility::resolveUrl( + fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)), + Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr); + // ConnectionCallbacks will call write_request_callback from the connect and low-watermark + // callbacks. Set a small buffer limit so high-watermark is triggered after every write and + // low-watermark is triggered every time the buffer is drained. + client_->setBufferLimits(1); + + client_->addConnectionCallbacks(*callbacks_); + client_->addReadFilter( + Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)}); + client_->connect(); +} + RawConnectionDriver::~RawConnectionDriver() = default; void RawConnectionDriver::run(Event::Dispatcher::RunType run_type) { dispatcher_->run(run_type); } diff --git a/test/integration/utility.h b/test/integration/utility.h index 6554234ddc95..1815d3c89147 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -59,8 +59,13 @@ using BufferingStreamDecoderPtr = std::unique_ptr; */ class RawConnectionDriver { public: + using DoWriteCallback = std::function; using ReadCallback = std::function; + RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, + ReadCallback response_data_callback, Network::Address::IpVersion version, + Network::TransportSocketPtr transport_socket); + // Similar to the constructor above but accepts the request as a constructor argument. RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, ReadCallback data_callback, Network::Address::IpVersion version); ~RawConnectionDriver(); @@ -75,29 +80,45 @@ class RawConnectionDriver { private: struct ForwardingFilter : public Network::ReadFilterBaseImpl { ForwardingFilter(RawConnectionDriver& parent, ReadCallback cb) - : parent_(parent), data_callback_(cb) {} + : parent_(parent), response_data_callback_(cb) {} // Network::ReadFilter Network::FilterStatus onData(Buffer::Instance& data, bool) override { - data_callback_(*parent_.client_, data); + response_data_callback_(*parent_.client_, data); data.drain(data.length()); return Network::FilterStatus::StopIteration; } RawConnectionDriver& parent_; - ReadCallback data_callback_; + ReadCallback response_data_callback_; }; struct ConnectionCallbacks : public Network::ConnectionCallbacks { + using WriteCb = std::function; + + ConnectionCallbacks(WriteCb write_cb) : write_cb_(write_cb) {} + bool connected() const { return connected_; } + bool closed() const { return closed_; } + + // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { + if (!connected_ && event == Network::ConnectionEvent::Connected) { + write_cb_(); + } + last_connection_event_ = event; connecting_ = false; } void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} + void onBelowWriteBufferLowWatermark() override { write_cb_(); } bool connecting_{true}; Network::ConnectionEvent last_connection_event_; + + private: + WriteCb write_cb_; + bool connected_{false}; + bool closed_{false}; }; Stats::IsolatedStoreImpl stats_store_;