Skip to content

Commit

Permalink
healthcheck: fix grpc inline removal crashes (#749)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Klein <[email protected]>
Signed-off-by: Pradeep Rao <[email protected]>
  • Loading branch information
pradeepcrao committed Jun 8, 2022
1 parent 5d62efe commit 5807212
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 7 deletions.
26 changes: 19 additions & 7 deletions source/common/upstream/health_checker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -778,10 +778,17 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onGoAway(
// Even if we have active health check probe, fail it on GOAWAY and schedule new one.
if (request_encoder_) {
handleFailure(envoy::data::core::v3::NETWORK);
expect_reset_ = true;
request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
// request_encoder_ can already be destroyed if the host was removed during the failure callback
// above.
if (request_encoder_ != nullptr) {
expect_reset_ = true;
request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
}
}
// client_ can already be destroyed if the host was removed during the failure callback above.
if (client_ != nullptr) {
client_->close();
}
client_->close();
}

bool GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::isHealthCheckSucceeded(
Expand Down Expand Up @@ -815,12 +822,17 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onRpcComplete(
if (end_stream) {
resetState();
} else {
// resetState() will be called by onResetStream().
expect_reset_ = true;
request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
// request_encoder_ can already be destroyed if the host was removed during the failure callback
// above.
if (request_encoder_ != nullptr) {
// resetState() will be called by onResetStream().
expect_reset_ = true;
request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
}
}

if (!parent_.reuse_connection_ || goaway) {
// client_ can already be destroyed if the host was removed during the failure callback above.
if (client_ != nullptr && (!parent_.reuse_connection_ || goaway)) {
client_->close();
}
}
Expand Down
64 changes: 64 additions & 0 deletions test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4528,6 +4528,70 @@ TEST_F(GrpcHealthCheckerImplTest, SuccessStartFailedFailFirst) {
expectHostHealthy(true);
}

// Verify functionality when a host is removed inline with a failure via RPC that was proceeded
// by a GOAWAY.
TEST_F(GrpcHealthCheckerImplTest, GrpcHealthFailViaRpcRemoveHostInCallback) {
setupHC();
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {
makeTestHost(cluster_->info_, "tcp://127.0.0.1:80", simTime())};

expectSessionCreate();
expectHealthcheckStart(0);
EXPECT_CALL(event_logger_, logUnhealthy(_, _, _, true));
health_checker_->start();

EXPECT_CALL(*this, onHostStatus(_, HealthTransition::Changed))
.WillOnce(Invoke([&](HostSharedPtr host, HealthTransition) {
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {};
cluster_->prioritySet().runUpdateCallbacks(0, {}, {host});
}));
EXPECT_CALL(event_logger_, logEjectUnhealthy(_, _, _));
test_sessions_[0]->codec_client_->raiseGoAway(Http::GoAwayErrorCode::NoError);
respondServiceStatus(0, grpc::health::v1::HealthCheckResponse::NOT_SERVING);
}

// Verify functionality when a host is removed inline with a failure via an error GOAWAY.
TEST_F(GrpcHealthCheckerImplTest, GrpcHealthFailViaGoawayRemoveHostInCallback) {
setupHCWithUnhealthyThreshold(/*threshold=*/1);
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {
makeTestHost(cluster_->info_, "tcp://127.0.0.1:80", simTime())};

expectSessionCreate();
expectHealthcheckStart(0);
EXPECT_CALL(event_logger_, logUnhealthy(_, _, _, true));
health_checker_->start();

EXPECT_CALL(*this, onHostStatus(_, HealthTransition::Changed))
.WillOnce(Invoke([&](HostSharedPtr host, HealthTransition) {
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {};
cluster_->prioritySet().runUpdateCallbacks(0, {}, {host});
}));
EXPECT_CALL(event_logger_, logEjectUnhealthy(_, _, _));
test_sessions_[0]->codec_client_->raiseGoAway(Http::GoAwayErrorCode::Other);
}

// Verify functionality when a host is removed inline with by a bad RPC response.
TEST_F(GrpcHealthCheckerImplTest, GrpcHealthFailViaBadResponseRemoveHostInCallback) {
setupHCWithUnhealthyThreshold(/*threshold=*/1);
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {
makeTestHost(cluster_->info_, "tcp://127.0.0.1:80", simTime())};

expectSessionCreate();
expectHealthcheckStart(0);
EXPECT_CALL(event_logger_, logUnhealthy(_, _, _, true));
health_checker_->start();

EXPECT_CALL(*this, onHostStatus(_, HealthTransition::Changed))
.WillOnce(Invoke([&](HostSharedPtr host, HealthTransition) {
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {};
cluster_->prioritySet().runUpdateCallbacks(0, {}, {host});
}));
EXPECT_CALL(event_logger_, logEjectUnhealthy(_, _, _));
std::unique_ptr<Http::TestResponseHeaderMapImpl> response_headers(
new Http::TestResponseHeaderMapImpl{{":status", "500"}});
test_sessions_[0]->stream_response_callbacks_->decodeHeaders(std::move(response_headers), false);
}

// Test host recovery after explicit check failure requires several successful checks.
TEST_F(GrpcHealthCheckerImplTest, GrpcHealthFail) {
setupHC();
Expand Down

0 comments on commit 5807212

Please sign in to comment.