Skip to content

Commit

Permalink
DISCOVERY: Cleanup AbstractDisruptionTestCase (#34808)
Browse files Browse the repository at this point in the history
* DISCOVERY: Cleanup AbstractDisruptionTestCase

* Make the internal test cluster manage minimum master nodes where we used the default of (nodes / 2 + 1) before
* Remove use of the `NodeConfigurationSource` indirection
* Relates #33675
  • Loading branch information
original-brownbear authored Oct 31, 2018
1 parent 455906d commit 3fa67c5
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,19 @@

package org.elasticsearch.discovery;

import java.nio.file.Path;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
Expand All @@ -56,27 +52,20 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {

static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.

private NodeConfigurationSource discoveryConfig;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(discoveryConfig.nodeSettings(nodeOrdinal))
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(DEFAULT_SETTINGS)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}

@Before
public void clearConfig() {
discoveryConfig = null;
}

@Override
protected int numberOfShards() {
return 3;
Expand Down Expand Up @@ -119,11 +108,6 @@ protected void beforeIndexDeletion() throws Exception {
}

List<String> startCluster(int numberOfNodes) {
return startCluster(numberOfNodes, -1);
}

List<String> startCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(numberOfNodes, minimumMasterNode);
InternalTestCluster internalCluster = internalCluster();
List<String> nodes = internalCluster.startNodes(numberOfNodes);
ensureStableCluster(numberOfNodes);
Expand Down Expand Up @@ -152,38 +136,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

void configureCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(DEFAULT_SETTINGS, numberOfNodes, minimumMasterNode);
}

void configureCluster(Settings settings, int numberOfNodes, int minimumMasterNode) {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
logger.info("---> configured unicast");
// TODO: Rarely use default settings form some of these
Settings nodeSettings = Settings.builder()
.put(settings)
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file")
.build();

if (discoveryConfig == null) {
discoveryConfig = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(final int nodeOrdinal) {
return nodeSettings;
}

@Override
public Path nodeConfigPath(final int nodeOrdinal) {
return null;
}
};
}
}

ClusterState getNodeClusterState(String node) {
return client(node).admin().cluster().prepareState().setLocal(true).get().getState();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClusterDisruptionCleanSettingsIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

/**
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
* node but already deleted on the source node. Search request should still work.
*/
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
// Don't use AbstractDisruptionTestCase.DEFAULT_SETTINGS as settings
// (which can cause node disconnects on a slow CI machine)
internalCluster().startMasterOnlyNode();
final String node_1 = internalCluster().startDataOnlyNode();

logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
Settings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
);
ensureGreen("test");

final String node_2 = internalCluster().startDataOnlyNode();
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("_doc")
.setSource("{\"int_field\":1}", XContentType.JSON));
}
indexRandom(true, indexRequestBuilderList);

IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
// now search for the documents and see if we get a reply
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -36,9 +35,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
Expand Down Expand Up @@ -72,8 +69,8 @@
/**
* Tests various cluster operations (e.g., indexing) during disruptions.
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {

/**
Expand Down Expand Up @@ -289,7 +286,7 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception {

// simulate handling of sending shard failure during an isolation
public void testSendingShardFailure() throws Exception {
List<String> nodes = startCluster(3, 2);
List<String> nodes = startCluster(3);
String masterNode = internalCluster().getMasterName();
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
String nonMasterNode = randomFrom(nonMasterNodes);
Expand Down Expand Up @@ -357,43 +354,10 @@ public void onFailure(Exception e) {
}
}

/**
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
* node but already deleted on the source node. Search request should still work.
*/
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
configureCluster(Settings.EMPTY, 3, 1);
internalCluster().startMasterOnlyNode();
final String node_1 = internalCluster().startDataOnlyNode();

logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
Settings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
);
ensureGreen("test");

final String node_2 = internalCluster().startDataOnlyNode();
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("_doc")
.setSource("{\"int_field\":1}", XContentType.JSON));
}
indexRandom(true, indexRequestBuilderList);

IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
// now search for the documents and see if we get a reply
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
}

public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception {
// test for https://github.com/elastic/elasticsearch/issues/8823
configureCluster(2, 1);
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNode(Settings.EMPTY);

ensureStableCluster(2);
assertAcked(prepareCreate("index").setSettings(Settings.builder().put("index.number_of_replicas", 0)));
index("index", "_doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
Expand All @@ -416,14 +380,12 @@ public boolean clearData(String nodeName) {
*/
public void testIndicesDeleted() throws Exception {
final Settings settings = Settings.builder()
.put(DEFAULT_SETTINGS)
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
.build();
final String idxName = "test";
configureCluster(settings, 3, 2);
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2);
final String dataNode = internalCluster().startDataOnlyNode();
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2, settings);
final String dataNode = internalCluster().startDataOnlyNode(settings);
ensureStableCluster(3);
assertAcked(prepareCreate("test"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@
/**
* Tests for discovery during disruptions.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {

public void testIsolatedUnicastNodes() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4, -1);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);

Expand Down Expand Up @@ -100,7 +100,7 @@ public void testIsolatedUnicastNodes() throws Exception {
*/
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4, -1);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node={}", masterNode);
Expand Down Expand Up @@ -138,15 +138,8 @@ public void testUnicastSinglePingResponseContainsMaster() throws Exception {
* Test cluster join with issues in cluster state publishing *
*/
public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
List<String> nodes = startCluster(2, 1);

String masterNode = internalCluster().getMasterName();
String nonMasterNode;
if (masterNode.equals(nodes.get(0))) {
nonMasterNode = nodes.get(1);
} else {
nonMasterNode = nodes.get(0);
}
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
String nonMasterNode = internalCluster().startDataOnlyNode(Settings.EMPTY);

DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();

Expand Down Expand Up @@ -196,7 +189,6 @@ public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
}

public void testClusterFormingWithASlowNode() throws Exception {
configureCluster(3, 2);

SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);

Expand All @@ -212,7 +204,6 @@ public void testClusterFormingWithASlowNode() throws Exception {
}

public void testElectMasterWithLatestVersion() throws Exception {
configureCluster(3, 2);
final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
ensureStableCluster(3);
ServiceDisruptionScheme isolateAllNodes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
/**
* Tests relating to the loss of the master.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class MasterDisruptionIT extends AbstractDisruptionTestCase {

/**
Expand Down Expand Up @@ -153,8 +153,8 @@ public void testNodesFDAfterMasterReelection() throws Exception {
*/
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.test.disruption:TRACE")
public void testStaleMasterNotHijackingMajority() throws Exception {
// 3 node cluster with unicast discovery and minimum_master_nodes set to 2:
final List<String> nodes = startCluster(3, 2);
// 3 node cluster with unicast discovery and minimum_master_nodes set to the default of 2:
final List<String> nodes = startCluster(3);

// Save the current master node as old master node, because that node will get frozen
final String oldMasterNode = internalCluster().getMasterName();
Expand Down Expand Up @@ -267,7 +267,7 @@ public void onFailure(String source, Exception e) {
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
*/
public void testMasterNodeGCs() throws Exception {
List<String> nodes = startCluster(3, -1);
List<String> nodes = startCluster(3);

String oldMasterNode = internalCluster().getMasterName();
// a very long GC, but it's OK as we remove the disruption when it has had an effect
Expand Down
Loading

0 comments on commit 3fa67c5

Please sign in to comment.