diff --git a/source/extensions/config_subscription/grpc/grpc_mux_failover.h b/source/extensions/config_subscription/grpc/grpc_mux_failover.h index 00876af5c555..745e1be9409f 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_failover.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_failover.h @@ -30,19 +30,25 @@ namespace Config { * will be followed by the GrpcMuxFailover calling the GrpcStreamCallbacks on the GrpcMux * object that initialized it. * - * Note: this class is WIP and should be considered alpha! - * At the moment, the supported policy is as follows: + * To simplify the state-machine, Envoy can be in one of the mutually exclusive states: + * ConnectingToPrimary - attempting to connect to the primary source. + * ConnectedToPrimary - after receiving a response from the primary source. + * ConnectingToFailover - attempting to connect to the failover source. + * ConnectedToFailover - after receiving a response from the failover source. + * None - not attempting to connect or connected to any source (e.g., upon initialization). + * * The GrpcMuxFailover attempts to establish a connection to the primary source. Once a response is * received from the primary source it will be considered available, and the failover will not be * used. Any future reconnection attempts will be to the primary source only. * However, if no response is received from the primary source, and accessing the primary has * failed 2 times in a row, the GrpcMuxFailover will attempt to establish a connection to the - * failover source. If a response from the failover source is received, only the failover source - * will be used. - * If the failover source is unavailable, the GrpcMuxFailover will alternate between attempts - * to reconnect to the primary source and the failover source. - * In the future, this behavior may change, and the GrpcMuxFailover will always - * prefer the primary source, even if prior connection to the failover was successful. + * failover source. Envoy will keep alternating between the primary and failover sources attempting + * to connect to one of them. If a response from the failover source is received, it will be the + * source of configuration until the connection is closed. In case the failover connection is + * closed, Envoy will attempt to connect to the primary, before retrying to connect to the failover + * source. If the failover source is unavailable or a connection to it is closed, the + * GrpcMuxFailover will alternate between attempts to reconnect to the primary source and the + * failover source. * TODO(adisuissa): The number of consecutive failures is currently statically * defined, and may be converted to a config field in the future. */ @@ -64,8 +70,7 @@ class GrpcMuxFailover : public GrpcStreamInterface, Event::Dispatcher& dispatcher) : grpc_mux_callbacks_(grpc_mux_callbacks), primary_callbacks_(*this), primary_grpc_stream_(std::move(primary_stream_creator(&primary_callbacks_))), - connecting_to_primary_(false), connecting_to_failover_(false), - connected_to_(ConnectedState::None), ever_connected_to_(ConnectedState::None) { + connection_state_(ConnectionState::None), ever_connected_to_primary_(false) { ASSERT(primary_grpc_stream_ != nullptr); if (failover_stream_creator.has_value()) { ENVOY_LOG(warn, "Using xDS-Failover. Note that the implementation is currently considered " @@ -84,32 +89,34 @@ class GrpcMuxFailover : public GrpcStreamInterface, // Attempts to establish a new stream to the either the primary or failover source. void establishNewStream() override { // Attempt establishing a connection to the primary source. - // This method may be called multiple times, even if the primary stream is already - // established or in the process of being established. + // This method may be called multiple times, even if the primary/failover stream + // is already established or in the process of being established. if (complete_retry_timer_) { complete_retry_timer_->disableTimer(); } - // First check if Envoy ever connected to the primary/failover, and if so - // persist attempts to that source. - if (ever_connected_to_ == ConnectedState::Primary) { - ASSERT(!connecting_to_failover_); - ENVOY_LOG_MISC(trace, "Attempting to reconnect to the primary gRPC source, as a connection " - "to it was previously established."); - establishStreamToPrimaryIfNotConnected(); - return; - } else if (ever_connected_to_ == ConnectedState::Failover) { - ASSERT(!connecting_to_primary_); - ENVOY_LOG_MISC(trace, "Attempting to reconnect to the failover gRPC source, as a connection " - "to it was previously established."); - establishStreamToFailoverIfNotConnected(); + // If already connected to one of the source, return. + if (connection_state_ == ConnectionState::ConnectedToPrimary || + connection_state_ == ConnectionState::ConnectedToFailover) { + ENVOY_LOG_MISC(trace, + "Already connected to an xDS server, skipping establishNewStream() call"); return; } - // No prior connection was established, prefer the primary over the failover. - if (connecting_to_failover_) { - establishStreamToFailoverIfNotConnected(); + // connection_state_ is either None, ConnectingToPrimary or + // ConnectingToFailover. In the first 2 cases try to connect to the primary + // (preferring the primary in the case of None), and in the third case + // try to connect to the failover. + // Note that if a connection to the primary source was ever successful, the + // failover manager will keep setting connection_state_ to either None or + // ConnectingToPrimary, which ensures that only the primary stream will be + // established. + if (connection_state_ == ConnectionState::ConnectingToFailover) { + ASSERT(!ever_connected_to_primary_); + failover_grpc_stream_->establishNewStream(); } else { - // Either connecting to primary or connected to it, or neither. - establishStreamToPrimaryIfNotConnected(); + ASSERT(connection_state_ == ConnectionState::None || + connection_state_ == ConnectionState::ConnectingToPrimary); + connection_state_ = ConnectionState::ConnectingToPrimary; + primary_grpc_stream_->establishNewStream(); } } @@ -125,6 +132,7 @@ class GrpcMuxFailover : public GrpcStreamInterface, // Sends a message using the underlying stream. void sendMessage(const RequestType& request) override { if (connectingToOrConnectedToFailover()) { + ASSERT(!ever_connected_to_primary_); failover_grpc_stream_->sendMessage(request); return; } @@ -172,10 +180,9 @@ class GrpcMuxFailover : public GrpcStreamInterface, // Retries to connect again to the primary and then (possibly) to the // failover. Assumes that no connection has been made or is being attempted. void retryConnections() { - ASSERT(!connecting_to_primary_ && !connecting_to_failover_ && - (connected_to_ == ConnectedState::None)); + ASSERT(connection_state_ == ConnectionState::None); ENVOY_LOG(trace, "Expired timer, retrying to reconnect to the primary xDS server."); - connecting_to_primary_ = true; + connection_state_ = ConnectionState::ConnectingToPrimary; primary_grpc_stream_->establishNewStream(); } @@ -191,21 +198,19 @@ class GrpcMuxFailover : public GrpcStreamInterface, // considering the primary source as available. // Calling the onStreamEstablished() callback on the GrpcMux object will // trigger the GrpcMux to start sending requests. - ASSERT(parent_.connecting_to_primary_ && !parent_.connecting_to_failover_ && - (parent_.connected_to_ == ConnectedState::None)); + ASSERT(parent_.connection_state_ == ConnectionState::ConnectingToPrimary); parent_.grpc_mux_callbacks_.onStreamEstablished(); } void onEstablishmentFailure() override { // This will be called when the primary stream fails to establish a connection, or after the // connection was closed. - ASSERT(parent_.connectingToOrConnectedToPrimary() && - !parent_.connectingToOrConnectedToFailover()); + ASSERT(parent_.connectingToOrConnectedToPrimary()); // If there's no failover supported, this will just be a pass-through // callback. if (parent_.failover_grpc_stream_ != nullptr) { - if (parent_.connecting_to_primary_ && - (parent_.ever_connected_to_ != ConnectedState::Primary)) { + if (parent_.connection_state_ == ConnectionState::ConnectingToPrimary && + !parent_.ever_connected_to_primary_) { // If there are 2 consecutive failures to the primary, Envoy will try to connect to the // failover. primary_consecutive_failures_++; @@ -214,12 +219,14 @@ class GrpcMuxFailover : public GrpcStreamInterface, // Terminate the primary stream and establish a connection to the failover stream. ENVOY_LOG(debug, "Primary xDS stream failed to establish a connection at least 2 times " "in a row. Attempting to connect to the failover stream."); - parent_.connecting_to_primary_ = false; // This will close the stream and prevent the retry timer from // reconnecting to the primary source. + // TODO(adisuissa): need to ensure that when moving between primary and failover, + // the initial_resource_versions that are sent are empty. This will be + // done in a followup PR. parent_.primary_grpc_stream_->closeStream(); parent_.grpc_mux_callbacks_.onEstablishmentFailure(); - parent_.connecting_to_failover_ = true; + parent_.connection_state_ = ConnectionState::ConnectingToFailover; parent_.failover_grpc_stream_->establishNewStream(); return; } @@ -229,8 +236,7 @@ class GrpcMuxFailover : public GrpcStreamInterface, // later by the underlying grpc stream. ENVOY_LOG_MISC(trace, "Not trying to connect to failover. Will try again to reconnect to the " "primary (upon retry)."); - parent_.connecting_to_primary_ = true; - parent_.connected_to_ = ConnectedState::None; + parent_.connection_state_ = ConnectionState::ConnectingToPrimary; parent_.grpc_mux_callbacks_.onEstablishmentFailure(); } @@ -240,10 +246,9 @@ class GrpcMuxFailover : public GrpcStreamInterface, !parent_.connectingToOrConnectedToFailover()); // Received a response from the primary. The primary is now considered available (no failover // will be attempted). - parent_.ever_connected_to_ = ConnectedState::Primary; + parent_.ever_connected_to_primary_ = true; primary_consecutive_failures_ = 0; - parent_.connected_to_ = ConnectedState::Primary; - parent_.connecting_to_primary_ = false; + parent_.connection_state_ = ConnectionState::ConnectedToPrimary; parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats); } @@ -268,55 +273,45 @@ class GrpcMuxFailover : public GrpcStreamInterface, // the first response to be received before considering the failover available. // Calling the onStreamEstablished() callback on the GrpcMux object will // trigger the GrpcMux to start sending requests. - ASSERT(parent_.connecting_to_failover_ && !parent_.connecting_to_primary_ && - (parent_.connected_to_ == ConnectedState::None)); + ASSERT(parent_.connection_state_ == ConnectionState::ConnectingToFailover); + ASSERT(!parent_.ever_connected_to_primary_); parent_.grpc_mux_callbacks_.onStreamEstablished(); } void onEstablishmentFailure() override { // This will be called when the failover stream fails to establish a connection, or after the // connection was closed. - ASSERT(parent_.connectingToOrConnectedToFailover() && - !parent_.connectingToOrConnectedToPrimary()); - if (parent_.ever_connected_to_ != ConnectedState::Failover) { - ASSERT(parent_.connecting_to_failover_); - // If Envoy never established a connecting the failover, it will try to connect to the - // primary next. - ENVOY_LOG(debug, "Failover xDS stream failed to establish a connection. Attempting to " - "connect to the primary stream."); - parent_.connecting_to_failover_ = false; - // This will close the stream and prevent the retry timer from - // reconnecting to the failover source. - parent_.failover_grpc_stream_->closeStream(); - parent_.grpc_mux_callbacks_.onEstablishmentFailure(); - // Wait for a short period of time before retrying to reconnect to the - // primary, reducing strain on the network/servers in case of an issue. - // TODO(adisuissa): In the future, the reconnection attempts to the - // primary and failover sources will be decoupled, as each will use its - // own backoff timer, and this will not be needed. - parent_.complete_retry_timer_->enableTimer(std::chrono::milliseconds(500)); - return; - } - // Pass along the failure to the GrpcMux object. Retry will be triggered - // later by the underlying grpc stream. - ENVOY_LOG_MISC(trace, "Not trying to connect to primary. Will try again to reconnect to the " - "failover (upon retry)."); - parent_.connecting_to_failover_ = true; - parent_.connected_to_ = ConnectedState::None; + ASSERT(parent_.connectingToOrConnectedToFailover()); + // Either this was an intentional disconnection from the failover source, + // or unintentional. Either way, try to connect to the primary next. + ENVOY_LOG(debug, "Failover xDS stream diconnected (either after establishing a connection or " + "before). Attempting to connect to the primary stream."); + + // This will close the stream and prevent the retry timer from + // reconnecting to the failover source. + // TODO(adisuissa): need to ensure that when moving between primary and failover, + // the initial_resource_versions that are sent are empty. This will be + // done in a followup PR. + parent_.failover_grpc_stream_->closeStream(); parent_.grpc_mux_callbacks_.onEstablishmentFailure(); + // Setting the connection state to None, and when the retry timer will + // expire, Envoy will try to connect to the primary source. + parent_.connection_state_ = ConnectionState::None; + // Wait for a short period of time before retrying to reconnect to the + // primary, reducing strain on the network/servers in case of an issue. + // TODO(adisuissa): need to use the primary source's retry timer here, to wait + // for the next time to connect to the primary. This requires a refactor + // of the retry timer and moving it from the grpc_stream to here. + parent_.complete_retry_timer_->enableTimer(std::chrono::milliseconds(500)); } void onDiscoveryResponse(ResponseProtoPtr&& message, ControlPlaneStats& control_plane_stats) override { - ASSERT(parent_.connectingToOrConnectedToFailover() && - !parent_.connectingToOrConnectedToPrimary()); + ASSERT(parent_.connectingToOrConnectedToFailover()); + ASSERT(!parent_.ever_connected_to_primary_); // Received a response from the failover. The failover is now considered available (no going // back to the primary will be attempted). - // TODO(adisuissa): This will be modified in the future, when allowing the primary to always - // be preferred over the failover. - parent_.ever_connected_to_ = ConnectedState::Failover; - parent_.connected_to_ = ConnectedState::Failover; - parent_.connecting_to_failover_ = false; + parent_.connection_state_ = ConnectionState::ConnectedToFailover; parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats); } @@ -332,30 +327,14 @@ class GrpcMuxFailover : public GrpcStreamInterface, // Returns true iff the state is connecting to primary or connected to it. bool connectingToOrConnectedToPrimary() const { - return connecting_to_primary_ || (connected_to_ == ConnectedState::Primary); + return connection_state_ == ConnectionState::ConnectingToPrimary || + connection_state_ == ConnectionState::ConnectedToPrimary; } // Returns true iff the state is connecting to failover or connected to it. bool connectingToOrConnectedToFailover() const { - return connecting_to_failover_ || (connected_to_ == ConnectedState::Failover); - } - - // Establishes a new stream to the primary source if not connected to it. - void establishStreamToPrimaryIfNotConnected() { - if (connected_to_ != ConnectedState::Primary) { - ASSERT(connected_to_ == ConnectedState::None); - connecting_to_primary_ = true; - primary_grpc_stream_->establishNewStream(); - } - } - - // Establishes a new stream to the failover source if not connected to it. - void establishStreamToFailoverIfNotConnected() { - if (connected_to_ != ConnectedState::Failover) { - ASSERT(connected_to_ == ConnectedState::None); - connecting_to_failover_ = true; - failover_grpc_stream_->establishNewStream(); - } + return connection_state_ == ConnectionState::ConnectingToFailover || + connection_state_ == ConnectionState::ConnectedToFailover; } // The following method overrides are to allow GrpcMuxFailover to extend the @@ -389,31 +368,33 @@ class GrpcMuxFailover : public GrpcStreamInterface, // initialized when failover is supported. Event::TimerPtr complete_retry_timer_{nullptr}; - enum class ConnectedState { None, Primary, Failover }; + enum class ConnectionState { + None, + ConnectingToPrimary, + ConnectedToPrimary, + ConnectingToFailover, + ConnectedToFailover + }; // Flags to keep track of the state of connections to primary/failover. - // All initialized to false/None, as there is no connection process during - // initialization. - // The object starts with all the connecting flags set to false, and - // connected_to to None. Once a new stream is attempted, - // connecting_to_primary_ will become true, until a response will be received - // from the primary (connected_to_ will become Primary), or a failure - // to establish a connection to the primary occurs. In the latter case, if - // Envoy attempts to reconnect to the primary, connecting_to_primary_ will - // stay true, but if it attempts to connect to the failover, connecting_to_primary_ - // will be set to false, and connecting_to_failover_ will be true. - // The values of connecting_to_failover_ and connected_to_ set to Failover will be - // determined similar to the primary variants. + // The object starts with all the connecting_to and connected_to flags set + // to None. + // Once a new stream is attempted, connecting_to_ will become Primary, until + // a response will be received from the primary (connected_to_ will be set + // to Primary), or a failure to establish a connection to the primary occurs. + // In the latter case, if Envoy attempts to reconnect to the primary, + // connecting_to_ will stay Primary, but if it attempts to connect to the failover, + // connecting_to_ will be set to Failover. + // If Envoy successfully connects to the failover, connected_to_ will be set + // to Failover. // Note that while Envoy can only be connected to a single source (mutually // exclusive), it can attempt connecting to more than one source at a time. - bool connecting_to_primary_{false}; - bool connecting_to_failover_{false}; - ConnectedState connected_to_; + ConnectionState connection_state_; // A flag that keeps track of whether Envoy successfully connected to either the - // primary or failover source. Envoy successfully connected to a source once - // it receives a response from it. - ConnectedState ever_connected_to_; + // primary or failover source. Envoy is considered successfully connected to a source + // once it receives a response from it. + bool ever_connected_to_primary_{false}; }; } // namespace Config diff --git a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc index 6e69db824701..383b13a750d6 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc @@ -398,25 +398,43 @@ TEST_F(GrpcMuxFailoverTest, PrimaryOnlyAttemptsAfterPrimaryAvailable) { grpc_mux_failover_->establishNewStream(); } -// Validate that after the failover is available (a response is received), all -// reconnect attempts will be to the failover. -TEST_F(GrpcMuxFailoverTest, FailoverOnlyAttemptsAfterFailoverAvailable) { +// Validate that after the failover is available (a response is received), Envoy +// will try to reconnect to the primary (and then failover), and keep +// alternating between the two. +TEST_F(GrpcMuxFailoverTest, AlternatingPrimaryAndFailoverAttemptsAfterFailoverAvailable) { connectToFailover(); - // Emulate 5 disconnects, and ensure the primary reconnection isn't attempted. + // Emulate a 5 times disconnects. for (int attempt = 0; attempt < 5; ++attempt) { - // Emulate a failover source failure that will not result in an attempt to - // connect to the primary. It should not close the failover stream (so - // the retry mechanism will kick in). - EXPECT_CALL(failover_stream_, closeStream()).Times(0); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); - EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); - failover_callbacks_->onEstablishmentFailure(); + if (attempt % 2 == 0) { + // Emulate a failover source failure that will result in an attempt to + // connect to the primary. It should close the failover stream, and + // enable the retry timer. + EXPECT_CALL(failover_stream_, closeStream()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); + EXPECT_CALL(*timer_, enableTimer(_, _)); + failover_callbacks_->onEstablishmentFailure(); + // Emulate a timer tick, which should try to reconnect to the primary + // stream. + EXPECT_CALL(primary_stream_, establishNewStream()); + timer_cb_(); + } else { + // Emulate a primary source failure that will result in an attempt to + // connect to the failover. It should close the primary stream, and + // try to establish the failover stream. + EXPECT_CALL(primary_stream_, closeStream()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + EXPECT_CALL(failover_stream_, establishNewStream()); + primary_callbacks_->onEstablishmentFailure(); + } } - // Emulate a call to establishNewStream(). - EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); - EXPECT_CALL(failover_stream_, establishNewStream()); + // Last attempt ended with failing to establish a failover stream, + // emulate a successful primary stream. + EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); + EXPECT_CALL(primary_stream_, establishNewStream()); grpc_mux_failover_->establishNewStream(); } @@ -603,6 +621,29 @@ TEST_F(GrpcMuxFailoverTest, OnWriteableConnectedToPrimaryInvoked) { primary_callbacks_->onWriteable(); } +// Validates that when connected to primary, a subsequent call to establishNewStream +// will not try to recreate the stream. +TEST_F(GrpcMuxFailoverTest, NoRecreateStreamWhenConnectedToPrimary) { + // Validate connected to primary. + { + connectToPrimary(); + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); + grpc_mux_failover_->establishNewStream(); + } +} + +// Validates that when connected to failover, a subsequent call to establishNewStream +// will not try to recreate the stream. +TEST_F(GrpcMuxFailoverTest, NoRecreateStreamWhenConnectedToFailover) { + // Validate connected to failover. + { + connectToFailover(); + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); + grpc_mux_failover_->establishNewStream(); + } +} } // namespace } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc b/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc index d48a51b16980..93be6621f445 100644 --- a/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc +++ b/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc @@ -183,6 +183,14 @@ class XdsFailoverAdsIntegrationTest : public AdsDeltaSotwIntegrationSubStatePara RELEASE_ASSERT(result, result.message()); } + // Waits for a failover source connected and immediately disconnects. + void failoverConnectionFailure() { + AssertionResult result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->close(); + RELEASE_ASSERT(result, result.message()); + } + envoy::config::endpoint::v3::ClusterLoadAssignment buildClusterLoadAssignment(const std::string& name) { return ConfigHelper::buildClusterLoadAssignment( @@ -559,8 +567,8 @@ TEST_P(XdsFailoverAdsIntegrationTest, NoFailoverUseAfterPrimaryResponse) { xds_stream_.get())); } -// Validate that once failover answers, primary will not be used, even after disconnecting. -TEST_P(XdsFailoverAdsIntegrationTest, NoPrimaryUseAfterFailoverResponse) { +// Validate that once failover responds, and then disconnects, primary will be attempted. +TEST_P(XdsFailoverAdsIntegrationTest, PrimaryUseAfterFailoverResponseAndDisconnect) { // These tests are not executed with GoogleGrpc because they are flaky due to // the large timeout values for retries. SKIP_IF_GRPC_CLIENT(Grpc::ClientType::GoogleGrpc); @@ -579,9 +587,9 @@ TEST_P(XdsFailoverAdsIntegrationTest, NoPrimaryUseAfterFailoverResponse) { ASSERT_TRUE(xds_connection_->waitForDisconnect()); // The CDS request fails when the primary disconnects. After that fetch the config // dump to ensure that the retry timer kicks in. - waitForPrimaryXdsRetryTimer(); // Expect another connection attempt to the primary. Reject the stream (gRPC failure) immediately. // As this is a 2nd consecutive failure, it will trigger failover. + waitForPrimaryXdsRetryTimer(); primaryConnectionFailure(); ASSERT_TRUE(xds_connection_->waitForDisconnect()); @@ -601,68 +609,285 @@ TEST_P(XdsFailoverAdsIntegrationTest, NoPrimaryUseAfterFailoverResponse) { Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); sendDiscoveryResponse( - CdsTypeUrl, {ConfigHelper::buildCluster("cluster_0")}, - {ConfigHelper::buildCluster("cluster_0")}, {}, "1", {}, failover_xds_stream_.get()); + CdsTypeUrl, {ConfigHelper::buildCluster("failover_cluster_0")}, + {ConfigHelper::buildCluster("failover_cluster_0")}, {}, "failover1", {}, + failover_xds_stream_.get()); + // Wait for an EDS request, and send its response. test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); - test_server_->waitForGaugeEq("cluster.cluster_0.warming_state", 1); + test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 1); + EXPECT_TRUE(compareDiscoveryRequest( + EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); + sendDiscoveryResponse( + EdsTypeUrl, {buildClusterLoadAssignment("failover_cluster_0")}, + {buildClusterLoadAssignment("failover_cluster_0")}, {}, "failover1", {}, + failover_xds_stream_.get()); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 0); + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "failover1", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); - // Envoy has received a CDS response, it means the primary is available. - // Now disconnect the primary. + // Envoy has received CDS and EDS responses, it means the failover is available. + // Now disconnect the failover source. failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); - // CDS was successful, but EDS will fail. After that add a notification to the + // CDS and EDS were successful, but LDS will fail. After that add a notification to the // main thread to ensure that the retry timer kicks in. - test_server_->waitForCounterGe("cluster.cluster_0.update_failure", 1); + test_server_->waitForCounterGe("listener_manager.lds.update_failure", 1); absl::Notification notification; test_server_->server().dispatcher().post([&]() { notification.Notify(); }); notification.WaitForNotification(); timeSystem().advanceTimeWait(std::chrono::milliseconds(1000)); - // In this case (received a response), both EnvoyGrpc and GoogleGrpc keep the connection open. + // The failover disconnected, so the next step is trying to connect to the + // primary. First ensure that the failover isn't being attempted, and then let + // the connection to the primary succeed. + EXPECT_FALSE(failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_, + std::chrono::seconds(5))); + EXPECT_TRUE(xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_)); + + // Allow receiving config from the primary. + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + xds_stream_->startGrpcStream(); + + // Ensure basic flow with primary works. Validate that the + // initial_resource_versions for delta-xDS is empty. + // TODO(adisuissa): ensure initial_resource_versions is empty, once this is supported. + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + xds_stream_.get())); + EXPECT_TRUE( + compareDiscoveryRequest(EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, + false, Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream_.get())); + EXPECT_TRUE(compareDiscoveryRequest(LdsTypeUrl, "", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + xds_stream_.get())); + sendDiscoveryResponse( + CdsTypeUrl, {ConfigHelper::buildCluster("primary_cluster_0")}, + {ConfigHelper::buildCluster("primary_cluster_0")}, {}, "primary1", {}, xds_stream_.get()); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + test_server_->waitForGaugeEq("cluster.primary_cluster_0.warming_state", 1); + + // Expect an updated failover EDS request. + EXPECT_TRUE(compareDiscoveryRequest(EdsTypeUrl, "", {"primary_cluster_0"}, {"primary_cluster_0"}, + {}, false, Grpc::Status::WellKnownGrpcStatus::Ok, "", + xds_stream_.get())); +} + +// Validates that if failover is used, and then disconnected, and the primary +// still doesn't respond, failover will be attempted with the correct +// initial_resource_versions. +TEST_P(XdsFailoverAdsIntegrationTest, FailoverUseAfterFailoverResponseAndDisconnect) { + // These tests are not executed with GoogleGrpc because they are flaky due to + // the large timeout values for retries. + SKIP_IF_GRPC_CLIENT(Grpc::ClientType::GoogleGrpc); +#ifdef ENVOY_ENABLE_UHV + // With UHV the finishGrpcStream() isn't detected as invalid frame because of + // no ":status" header, unless "envoy.reloadable_features.enable_universal_header_validator" + // is also enabled. + config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator", + "true"); +#endif + initialize(); + + // 2 consecutive primary failures. + // Expect a connection to the primary. Reject the connection immediately. + primaryConnectionFailure(); + ASSERT_TRUE(xds_connection_->waitForDisconnect()); + // The CDS request fails when the primary disconnects. After that fetch the config + // dump to ensure that the retry timer kicks in. + // Expect another connection attempt to the primary. Reject the stream (gRPC failure) immediately. + // As this is a 2nd consecutive failure, it will trigger failover. + waitForPrimaryXdsRetryTimer(); + primaryConnectionFailure(); + ASSERT_TRUE(xds_connection_->waitForDisconnect()); + + // The CDS request fails when the primary disconnects. + test_server_->waitForCounterGe("cluster_manager.cds.update_failure", 2); + + AssertionResult result = + failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + // Failover is healthy, start the ADS gRPC stream. result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); RELEASE_ASSERT(result, result.message()); - // Immediately fail the connection. - failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); - - // Ensure that Envoy still attempts to connect to the primary, - // and keep disconnecting a few times and validate that the failover - // connection isn't attempted. - for (int i = 1; i < 5; ++i) { - ASSERT_TRUE(failover_xds_connection_->waitForDisconnect()); - // Wait longer due to the fixed 5 seconds failover . - waitForPrimaryXdsRetryTimer(i, 5); - // EnvoyGrpc will disconnect if the gRPC stream is immediately closed (as - // done above). - result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); - RELEASE_ASSERT(result, result.message()); - result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); - RELEASE_ASSERT(result, result.message()); - // Immediately fail the connection. - failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); - } + failover_xds_stream_->startGrpcStream(); - // When EnvoyGrpc is used, no new connection to the primary will be attempted. - EXPECT_FALSE( - xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_, std::chrono::seconds(1))); + // Ensure basic flow with failover works. + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + sendDiscoveryResponse( + CdsTypeUrl, {ConfigHelper::buildCluster("failover_cluster_0")}, + {ConfigHelper::buildCluster("failover_cluster_0")}, {}, "failover1", {}, + failover_xds_stream_.get()); + // Wait for an EDS request, and send its response. + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 1); + EXPECT_TRUE(compareDiscoveryRequest( + EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); + sendDiscoveryResponse( + EdsTypeUrl, {buildClusterLoadAssignment("failover_cluster_0")}, + {buildClusterLoadAssignment("failover_cluster_0")}, {}, "failover1", {}, + failover_xds_stream_.get()); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 0); + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "failover1", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + // Envoy has received CDS and EDS responses, it means the failover is available. + // Now disconnect the failover. + failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); + EXPECT_TRUE(failover_xds_connection_->close()); ASSERT_TRUE(failover_xds_connection_->waitForDisconnect()); - // Wait longer due to the fixed 5 seconds failover . - waitForPrimaryXdsRetryTimer(5, 5); - // Allow a connection to the failover. - // Expect a connection to the failover when using EnvoyGrpc. - // In case GoogleGrpc is used the current connection will be reused (new stream). + // Wait longer due to the fixed 5 seconds failover. + waitForPrimaryXdsRetryTimer(3, 5); + + // The failover disconnected, so the next step is trying to connect to the + // primary source. Disconnect the primary source immediately. + primaryConnectionFailure(); + ASSERT_TRUE(xds_connection_->waitForDisconnect()); + + // Expect a connection to the failover source to be attempted. result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); RELEASE_ASSERT(result, result.message()); + + // Failover is healthy, start the ADS gRPC stream. result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); failover_xds_stream_->startGrpcStream(); - // Validate that just the initial requests are sent to the primary. - EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "1", {}, {}, {}, true, + // Ensure basic flow with primary works. Validate that the + // initial_resource_versions for delta-xDS is empty. + // TODO(adisuissa): ensure initial_resource_versions contains the correct versions. + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); - EXPECT_TRUE(compareDiscoveryRequest(EdsTypeUrl, "", {"cluster_0"}, {"cluster_0"}, {}, false, + EXPECT_TRUE(compareDiscoveryRequest( + EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); + EXPECT_TRUE(compareDiscoveryRequest(LdsTypeUrl, "", {}, {}, {}, false, Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); + sendDiscoveryResponse( + CdsTypeUrl, {ConfigHelper::buildCluster("failover_cluster_1")}, + {ConfigHelper::buildCluster("failover_cluster_1")}, {}, "failover2", {}, + failover_xds_stream_.get()); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + test_server_->waitForGaugeEq("cluster.failover_cluster_1.warming_state", 1); + + // Expect an updated failover EDS request. + EXPECT_TRUE(compareDiscoveryRequest( + EdsTypeUrl, "", {"failover_cluster_1"}, {"failover_cluster_1"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); +} + +// Validate that once failover responds, and then disconnects, Envoy +// will alternate between the primary and failover sources (multiple times) if +// both are not responding. +TEST_P(XdsFailoverAdsIntegrationTest, + PrimaryAndFailoverAttemptsAfterFailoverResponseAndDisconnect) { + // These tests are not executed with GoogleGrpc because they are flaky due to + // the large timeout values for retries. + SKIP_IF_GRPC_CLIENT(Grpc::ClientType::GoogleGrpc); +#ifdef ENVOY_ENABLE_UHV + // With UHV the finishGrpcStream() isn't detected as invalid frame because of + // no ":status" header, unless "envoy.reloadable_features.enable_universal_header_validator" + // is also enabled. + config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_universal_header_validator", + "true"); +#endif + initialize(); + + // 2 consecutive primary failures. + // Expect a connection to the primary. Reject the connection immediately. + primaryConnectionFailure(); + ASSERT_TRUE(xds_connection_->waitForDisconnect()); + // The CDS request fails when the primary disconnects. After that fetch the config + // dump to ensure that the retry timer kicks in. + // Expect another connection attempt to the primary. Reject the stream (gRPC failure) immediately. + // As this is a 2nd consecutive failure, it will trigger failover. + waitForPrimaryXdsRetryTimer(); + primaryConnectionFailure(); + ASSERT_TRUE(xds_connection_->waitForDisconnect()); + + // The CDS request fails when the primary disconnects. + test_server_->waitForCounterGe("cluster_manager.cds.update_failure", 2); + + AssertionResult result = + failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + // Failover is healthy, start the ADS gRPC stream. + result = failover_xds_connection_->waitForNewStream(*dispatcher_, failover_xds_stream_); + RELEASE_ASSERT(result, result.message()); + failover_xds_stream_->startGrpcStream(); + + // Ensure basic flow with failover works. + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + sendDiscoveryResponse( + CdsTypeUrl, {ConfigHelper::buildCluster("failover_cluster_0")}, + {ConfigHelper::buildCluster("failover_cluster_0")}, {}, "failover1", {}, + failover_xds_stream_.get()); + // Wait for an EDS request, and send its response. + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 1); + EXPECT_TRUE(compareDiscoveryRequest( + EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); + sendDiscoveryResponse( + EdsTypeUrl, {buildClusterLoadAssignment("failover_cluster_0")}, + {buildClusterLoadAssignment("failover_cluster_0")}, {}, "failover1", {}, + failover_xds_stream_.get()); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + test_server_->waitForGaugeEq("cluster.failover_cluster_0.warming_state", 0); + EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "failover1", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {}, {}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get())); + + // Envoy has received CDS and EDS responses, it means the failover is available. + // Now disconnect the failover source. + failover_xds_stream_->finishGrpcStream(Grpc::Status::Internal); + EXPECT_TRUE(failover_xds_connection_->close()); + ASSERT_TRUE(failover_xds_connection_->waitForDisconnect()); + + // Ensure Envoy alternates between primary and failover. + // Up to this step there were 3 CDS update failures. In each iteration of the + // next loop there are going to be 2 failures (one for primary and another for + // failover). + for (int i = 1; i < 3; ++i) { + // Wait longer due to the fixed 5 seconds failover . + waitForPrimaryXdsRetryTimer(2 * i + 1, 5); + + // The failover disconnected, so the next step is trying to connect to the + // primary source. Disconnect the primary source immediately. + primaryConnectionFailure(); + ASSERT_TRUE(xds_connection_->waitForDisconnect()); + + // Expect a connection to the failover source to be attempted. Disconnect + // immediately. + result = failover_xds_upstream_->waitForHttpConnection(*dispatcher_, failover_xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = failover_xds_connection_->close(); + RELEASE_ASSERT(result, result.message()); + ASSERT_TRUE(failover_xds_connection_->waitForDisconnect()); + } } } // namespace Envoy