Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport to v1.13: connection: Remember transport socket read resumption requests and replay them when re-enabling read. (#13772) (#14173) #14255

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Version history
================
Changes
-------
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.
* udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash.

1.13.6 (September 29, 2020)
Expand Down
17 changes: 15 additions & 2 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
[this]() -> void { this->onHighWatermark(); })),
read_enabled_(true), above_high_watermark_(false), detect_early_close_(true),
enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) {
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
transport_wants_read_(false) {
// Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM
// condition and just crash.
RELEASE_ASSERT(ioHandle().fd() != -1, "");
Expand Down Expand Up @@ -339,7 +340,13 @@ void ConnectionImpl::readDisable(bool disable) {
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Instead fake an event to make sure the buffered data
// gets processed regardless and ensure that we dispatch it via onRead.
if (read_buffer_.length() > 0) {
if (read_buffer_.length() > 0 || transport_wants_read_) {
// If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
// able to process additional bytes even if there is no data in the kernel to kick off the
// filter chain. Alternately if the read buffer has data the fd could be read disabled. To
// handle these cases, fake an event to make sure the buffered data in the read buffer or in
// transport socket internal buffers gets processed regardless and ensure that we dispatch it
// via onRead.
dispatch_buffered_data_ = true;
file_event_->activate(Event::FileReadyType::Read);
}
Expand Down Expand Up @@ -509,9 +516,15 @@ void ConnectionImpl::onFileEvent(uint32_t events) {

void ConnectionImpl::onReadReady() {
ENVOY_CONN_LOG(trace, "read ready", *this);
ASSERT(read_enabled_);

ASSERT(!connecting_);

// Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
// the transport socket read resumption happens as requested; onReadReady() returns early without
// reading from the transport if the read buffer is above high watermark at the start of the
// method.
transport_wants_read_ = false;
IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
Expand Down
10 changes: 9 additions & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() override { file_event_->activate(Event::FileReadyType::Read); }
void setReadBufferReady() override {
transport_wants_read_ = true;
file_event_->activate(Event::FileReadyType::Read);
}

// Obtain global next connection ID. This should only be used in tests.
static uint64_t nextGlobalIdForTest() { return next_global_id_; }
Expand Down Expand Up @@ -178,6 +181,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
bool write_end_stream_ : 1;
bool current_write_end_stream_ : 1;
bool dispatch_buffered_data_ : 1;
// True if the most recent call to the transport socket's doRead method invoked setReadBufferReady
// to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true,
// readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read
// resumption happens when remaining bytes are held in transport socket internal buffers.
bool transport_wants_read_ : 1;
};

/**
Expand Down
55 changes: 55 additions & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,61 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) {
file_ready_cb_(Event::FileReadyType::Read);
}

// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is
// re-enabled.
TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// No calls to activate when re-enabling if there are no pending read requests.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);

// setReadBufferReady triggers an immediate call to activate.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->setReadBufferReady();

// When processing a sequence of read disable/read enable, changes to the enabled event mask
// happen only when the disable count transitions to/from 0.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
connection_->readDisable(false);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// Expect a read activation since there have been no transport doRead calls since the call to
// setReadBufferReady.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Disable read.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);

// Expect a read activate when re-enabling since the file ready cb has not done a read.
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// No read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Test that BytesSentCb is invoked at the correct times
TEST_F(MockTransportConnectionImplTest, BytesSentCallback) {
uint64_t bytes_sent = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "extensions/transport_sockets/tls/context_config_impl.h"
#include "extensions/transport_sockets/tls/context_manager_impl.h"

#include "test/integration/autonomous_upstream.h"
#include "test/integration/integration.h"
#include "test/integration/utility.h"
#include "test/test_common/network_utility.h"
Expand Down Expand Up @@ -177,6 +178,103 @@ TEST_P(SslIntegrationTest, AdminCertEndpoint) {
EXPECT_EQ("200", response->headers().Status()->value().getStringView());
}

class RawWriteSslIntegrationTest : public SslIntegrationTest {
protected:
std::unique_ptr<Http::TestHeaderMapImpl>
testFragmentedRequestWithBufferLimit(std::list<std::string> request_chunks,
uint32_t buffer_limit) {
autonomous_upstream_ = true;
config_helper_.setBufferLimits(buffer_limit, buffer_limit);
initialize();

// write_request_cb will write each of the items in request_chunks as a separate SSL_write.
auto write_request_cb = [&request_chunks](Network::ClientConnection& client) {
if (!request_chunks.empty()) {
Buffer::OwnedImpl buffer(request_chunks.front());
client.write(buffer, false);
request_chunks.pop_front();
}
};

auto client_transport_socket_factory_ptr =
createClientSslTransportSocketFactory({}, *context_manager_, *api_);
std::string response;
auto connection = createConnectionDriver(
lookupPort("http"), write_request_cb,
[&](Network::ClientConnection&, const Buffer::Instance& data) -> void {
response.append(data.toString());
},
client_transport_socket_factory_ptr->createTransportSocket({}));

// Drive the connection until we get a response.
while (response.empty()) {
connection->run(Event::Dispatcher::RunType::NonBlock);
}
EXPECT_THAT(response, testing::HasSubstr("HTTP/1.1 200 OK\r\n"));

connection->close();
return reinterpret_cast<AutonomousUpstream*>(fake_upstreams_.front().get())
->lastRequestHeaders();
}
};

INSTANTIATE_TEST_SUITE_P(IpVersions, RawWriteSslIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcessingHeaders) {
// The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were
// picked such that the connection's high watermark will trigger while processing the last SSL
// record containing the request headers. Verify that read resumption works correctly after
// hitting the receive buffer high watermark.
std::list<std::string> request_chunks = {
"GET / HTTP/1.1\r\nHost: host\r\n",
"key1:" + std::string(14000, 'a') + "\r\n",
"key2:" + std::string(16000, 'b') + "\r\n\r\n",
};

std::unique_ptr<Http::TestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
EXPECT_EQ(upstream_headers->Host()->value(), "host");
EXPECT_EQ(std::string(14000, 'a'),
upstream_headers->get(Envoy::Http::LowerCaseString("key1"))->value().getStringView());
EXPECT_EQ(std::string(16000, 'b'),
upstream_headers->get(Envoy::Http::LowerCaseString("key2"))->value().getStringView());
}

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingBody) {
// The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were
// picked such that the connection's high watermark will trigger while processing the last SSL
// record containing the POST body. Verify that read resumption works correctly after hitting the
// receive buffer high watermark.
std::list<std::string> request_chunks = {
"POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 30000\r\n\r\n",
std::string(14000, 'a'),
std::string(16000, 'a'),
};

std::unique_ptr<Http::TestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
}

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingLargerBody) {
std::list<std::string> request_chunks = {
"POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 150000\r\n\r\n",
};
for (int i = 0; i < 10; ++i) {
request_chunks.push_back(std::string(15000, 'a'));
}

std::unique_ptr<Http::TestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 16 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
}

// Validate certificate selection across different certificate types and client TLS versions.
class SslCertficateIntegrationTest
: public testing::TestWithParam<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class SslIntegrationTestBase : public HttpIntegrationTest {
// Set this true to debug SSL handshake issues with openssl s_client. The
// verbose trace will be in the logs, openssl must be installed separately.
bool debug_with_s_client_{false};

private:
std::unique_ptr<ContextManager> context_manager_;
};

Expand Down
16 changes: 16 additions & 0 deletions test/integration/integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,22 @@ class BaseIntegrationTest : protected Logger::Loggable<Logger::Id::testing> {
void sendRawHttpAndWaitForResponse(int port, const char* raw_http, std::string* response,
bool disconnect_after_headers_complete = false);

/**
* Helper to create ConnectionDriver.
*
* @param port the port to connect to.
* @param write_request_cb callback used to send data.
* @param data_callback the callback on the received data.
* @param transport_socket transport socket to use for the client connection
**/
std::unique_ptr<RawConnectionDriver> createConnectionDriver(
uint32_t port, RawConnectionDriver::DoWriteCallback write_request_cb,
std::function<void(Network::ClientConnection&, const Buffer::Instance&)>&& data_callback,
Network::TransportSocketPtr transport_socket) {
return std::make_unique<RawConnectionDriver>(port, write_request_cb, data_callback, version_,
std::move(transport_socket));
}

protected:
// Create the envoy server in another thread and start it.
// Will not return until that server is listening.
Expand Down
26 changes: 25 additions & 1 deletion test/integration/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia
api_ = Api::createApiForTest(stats_store_);
Event::GlobalTimeSystem time_system;
dispatcher_ = api_->allocateDispatcher();
callbacks_ = std::make_unique<ConnectionCallbacks>();
callbacks_ = std::make_unique<ConnectionCallbacks>([]() {});
client_ = dispatcher_->createClientConnection(
Network::Utility::resolveUrl(
fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)),
Expand All @@ -129,6 +129,30 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia
client_->connect();
}

RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback,
ReadCallback response_data_callback,
Network::Address::IpVersion version,
Network::TransportSocketPtr transport_socket) {
api_ = Api::createApiForTest(stats_store_);
Event::GlobalTimeSystem time_system;
dispatcher_ = api_->allocateDispatcher();
callbacks_ = std::make_unique<ConnectionCallbacks>(
[this, write_request_callback]() { write_request_callback(*client_); });
client_ = dispatcher_->createClientConnection(
Network::Utility::resolveUrl(
fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)),
Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr);
// ConnectionCallbacks will call write_request_callback from the connect and low-watermark
// callbacks. Set a small buffer limit so high-watermark is triggered after every write and
// low-watermark is triggered every time the buffer is drained.
client_->setBufferLimits(1);

client_->addConnectionCallbacks(*callbacks_);
client_->addReadFilter(
Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)});
client_->connect();
}

RawConnectionDriver::~RawConnectionDriver() = default;

void RawConnectionDriver::run(Event::Dispatcher::RunType run_type) { dispatcher_->run(run_type); }
Expand Down
29 changes: 25 additions & 4 deletions test/integration/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ using BufferingStreamDecoderPtr = std::unique_ptr<BufferingStreamDecoder>;
*/
class RawConnectionDriver {
public:
using DoWriteCallback = std::function<void(Network::ClientConnection&)>;
using ReadCallback = std::function<void(Network::ClientConnection&, const Buffer::Instance&)>;

RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback,
ReadCallback response_data_callback, Network::Address::IpVersion version,
Network::TransportSocketPtr transport_socket);
// Similar to the constructor above but accepts the request as a constructor argument.
RawConnectionDriver(uint32_t port, Buffer::Instance& initial_data, ReadCallback data_callback,
Network::Address::IpVersion version);
~RawConnectionDriver();
Expand All @@ -75,29 +80,45 @@ class RawConnectionDriver {
private:
struct ForwardingFilter : public Network::ReadFilterBaseImpl {
ForwardingFilter(RawConnectionDriver& parent, ReadCallback cb)
: parent_(parent), data_callback_(cb) {}
: parent_(parent), response_data_callback_(cb) {}

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data, bool) override {
data_callback_(*parent_.client_, data);
response_data_callback_(*parent_.client_, data);
data.drain(data.length());
return Network::FilterStatus::StopIteration;
}

RawConnectionDriver& parent_;
ReadCallback data_callback_;
ReadCallback response_data_callback_;
};

struct ConnectionCallbacks : public Network::ConnectionCallbacks {
using WriteCb = std::function<void()>;

ConnectionCallbacks(WriteCb write_cb) : write_cb_(write_cb) {}
bool connected() const { return connected_; }
bool closed() const { return closed_; }

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
if (!connected_ && event == Network::ConnectionEvent::Connected) {
write_cb_();
}

last_connection_event_ = event;
connecting_ = false;
}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
void onBelowWriteBufferLowWatermark() override { write_cb_(); }

bool connecting_{true};
Network::ConnectionEvent last_connection_event_;

private:
WriteCb write_cb_;
bool connected_{false};
bool closed_{false};
};

Stats::IsolatedStoreImpl stats_store_;
Expand Down