Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Asra Ali <[email protected]>
  • Loading branch information
asraa committed Nov 30, 2020
1 parent af17f95 commit b48aee6
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 13 deletions.
3 changes: 2 additions & 1 deletion source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ HttpConnPoolImplBase::HttpConnPoolImplBase(
: Envoy::ConnectionPool::ConnPoolImplBase(
host, priority, dispatcher, options,
wrapTransportSocketOptions(transport_socket_options, protocols), state),
random_generator_(random_generator) {
random_generator_(random_generator),
upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {
ASSERT(!protocols.empty());
}

Expand Down
4 changes: 4 additions & 0 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,

virtual CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE;
Random::RandomGenerator& randomGenerator() { return random_generator_; }
Event::SchedulableCallback* upstreamReadyCallback() { return upstream_ready_cb_.get(); }

protected:
friend class ActiveClient;
Random::RandomGenerator& random_generator_;

// onUpstreamReady callback only used by HTTP/1.1 implementation.
Event::SchedulableCallbackPtr upstream_ready_cb_;
};

// An implementation of Envoy::ConnectionPool::ActiveClient for HTTP/1.1 and HTTP/2
Expand Down
5 changes: 4 additions & 1 deletion source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ void ActiveClient::StreamWrapper::onDecodeComplete() {
parent_.codec_client_->close();
} else {
auto* pool = &parent_.parent();
pool->dispatcher().post([pool]() -> void { pool->onUpstreamReady(); });
if (pool->hasPendingStreams()) {
// SchedulableCallback guarantees callback will only be scheduled if not already enabled.
pool->upstreamReadyCallback()->scheduleCallbackCurrentIteration();
}
parent_.stream_wrapper_.reset();

pool->checkForDrained();
Expand Down
78 changes: 67 additions & 11 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public FixedHt
public:
ConnPoolImplForTest(Event::MockDispatcher& dispatcher,
Upstream::ClusterInfoConstSharedPtr cluster,
Random::RandomGenerator& random_generator)
Random::RandomGenerator& random_generator,
NiceMock<Event::MockSchedulableCallback>* upstream_ready_cb)
: FixedHttpConnPoolImpl(
Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000", dispatcher.timeSource()),
Upstream::ResourcePriority::Default, dispatcher, nullptr, nullptr, random_generator,
Expand All @@ -62,7 +63,8 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public FixedHt
return nullptr; // Not used: createCodecClient overloaded.
},
std::vector<Protocol>{Protocol::Http11}),
api_(Api::createApiForTest()), mock_dispatcher_(dispatcher) {}
api_(Api::createApiForTest()), mock_dispatcher_(dispatcher),
mock_upstream_ready_cb_(upstream_ready_cb) {}

~ConnPoolImplForTest() override {
EXPECT_EQ(0U, ready_clients_.size());
Expand Down Expand Up @@ -118,15 +120,17 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public FixedHt
}

void expectEnableUpstreamReady() {
EXPECT_CALL(mock_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb_));
EXPECT_CALL(*mock_upstream_ready_cb_, scheduleCallbackCurrentIteration())
.Times(1)
.RetiresOnSaturation();
}

void expectAndRunUpstreamReady() { post_cb_(); }
void expectAndRunUpstreamReady() { mock_upstream_ready_cb_->invokeCallback(); }

Upstream::ClusterConnectivityState state_;
Api::ApiPtr api_;
Event::MockDispatcher& mock_dispatcher_;
Event::PostCb post_cb_;
NiceMock<Event::MockSchedulableCallback>* mock_upstream_ready_cb_;
std::vector<TestCodecClient> test_clients_;
};

Expand All @@ -136,7 +140,9 @@ class ConnPoolImplForTest : public Event::TestUsingSimulatedTime, public FixedHt
class Http1ConnPoolImplTest : public testing::Test {
public:
Http1ConnPoolImplTest()
: conn_pool_(std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_, random_)) {}
: upstream_ready_cb_(new NiceMock<Event::MockSchedulableCallback>(&dispatcher_)),
conn_pool_(std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_, random_,
upstream_ready_cb_)) {}

~Http1ConnPoolImplTest() override {
EXPECT_EQ("", TestUtility::nonZeroedGauges(cluster_->stats_store_.gauges()));
Expand All @@ -145,6 +151,7 @@ class Http1ConnPoolImplTest : public testing::Test {
NiceMock<Random::MockRandomGenerator> random_;
NiceMock<Event::MockDispatcher> dispatcher_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_{new NiceMock<Upstream::MockClusterInfo>()};
NiceMock<Event::MockSchedulableCallback>* upstream_ready_cb_;
std::unique_ptr<ConnPoolImplForTest> conn_pool_;
NiceMock<Runtime::MockLoader> runtime_;
};
Expand Down Expand Up @@ -290,13 +297,16 @@ TEST_F(Http1ConnPoolImplTest, VerifyAlpnFallback) {
}));
cluster_->transport_socket_matcher_ =
std::make_unique<NiceMock<Upstream::MockTransportSocketMatcher>>(std::move(factory));
new NiceMock<Event::MockSchedulableCallback>(&dispatcher_);

// Recreate the conn pool so that the host re-evaluates the transport socket match, arriving at
// our test transport socket factory.
conn_pool_ = std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_, random_);
conn_pool_ =
std::make_unique<ConnPoolImplForTest>(dispatcher_, cluster_, random_, upstream_ready_cb_);
NiceMock<MockResponseDecoder> outer_decoder;
ConnPoolCallbacks callbacks;
conn_pool_->expectClientCreate(Protocol::Http11);

Http::ConnectionPool::Cancellable* handle = conn_pool_->newStream(outer_decoder, callbacks);
EXPECT_NE(nullptr, handle);

Expand Down Expand Up @@ -646,7 +656,6 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
inner_decoder->decodeHeaders(std::move(response_headers), true);

conn_pool_->expectAndRunUpstreamReady();
conn_pool_->expectEnableUpstreamReady();
EXPECT_TRUE(
callbacks2.outer_encoder_
->encodeHeaders(TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true)
Expand Down Expand Up @@ -716,7 +725,6 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {
EXPECT_CALL(callbacks2.pool_ready_, ready());
conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

conn_pool_->expectEnableUpstreamReady();
EXPECT_TRUE(
callbacks2.outer_encoder_
->encodeHeaders(TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true)
Expand Down Expand Up @@ -978,9 +986,7 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) {
r3.startRequest();
EXPECT_EQ(3U, cluster_->stats_.upstream_rq_total_.value());

conn_pool_->expectEnableUpstreamReady();
r2.completeResponse(false);
conn_pool_->expectEnableUpstreamReady();
r3.completeResponse(false);

// Disconnect both clients.
Expand Down Expand Up @@ -1125,6 +1131,56 @@ TEST_F(Http1ConnPoolImplTest, PendingRequestIsConsideredActive) {
EXPECT_EQ(1U, cluster_->stats_.upstream_cx_destroy_local_.value());
}

class ConnPoolImplNoDestructForTest : public ConnPoolImplForTest {
public:
ConnPoolImplNoDestructForTest(Event::MockDispatcher& dispatcher,
Upstream::ClusterInfoConstSharedPtr cluster,
Random::RandomGenerator& random_generator,
NiceMock<Event::MockSchedulableCallback>* upstream_ready_cb)
: ConnPoolImplForTest(dispatcher, cluster, random_generator, upstream_ready_cb) {}

~ConnPoolImplNoDestructForTest() override { destructAllConnections(); }
};

// Regression test for connection pool use after free when dispatcher runs posted callback
// conn_pool->onUpstreamReady() after connection pool is destroyed.
TEST_F(Http1ConnPoolImplTest, UseAfterFreeConnPool) {
// Recreate conn pool without destructor checks.
new NiceMock<Event::MockSchedulableCallback>(&dispatcher_);
conn_pool_ = std::make_unique<ConnPoolImplNoDestructForTest>(dispatcher_, cluster_, random_,
upstream_ready_cb_);

InSequence s;

NiceMock<MockResponseDecoder> outer_decoder;
ConnPoolCallbacks callbacks;
conn_pool_->expectClientCreate();
Http::ConnectionPool::Cancellable* handle = conn_pool_->newStream(outer_decoder, callbacks);
EXPECT_NE(nullptr, handle);

NiceMock<MockRequestEncoder> request_encoder;
ResponseDecoder* inner_decoder;
EXPECT_CALL(*conn_pool_->test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks.pool_ready_, ready());
EXPECT_CALL(*conn_pool_->test_clients_[0].connect_timer_, disableTimer());
conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

EXPECT_TRUE(
callbacks.outer_encoder_
->encodeHeaders(TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true)
.ok());

EXPECT_CALL(*conn_pool_, onClientDestroy());
dispatcher_.deferredDelete(std::move(conn_pool_));
inner_decoder->decodeHeaders(
ResponseHeaderMapPtr{new TestResponseHeaderMapImpl{{":status", "200"}}}, true);
dispatcher_.clearDeferredDeleteList();

// TODO(asraa): Clean this leak.
delete upstream_ready_cb_;
}

} // namespace
} // namespace Http1
} // namespace Http
Expand Down

0 comments on commit b48aee6

Please sign in to comment.