Skip to content

Commit

Permalink
Allow cluster access during node restart (elastic#42946)
Browse files Browse the repository at this point in the history
This commit modifies InternalTestCluster to allow using client() and
other operations inside a RestartCallback (onStoppedNode typically).
Restarting nodes are now removed from the map and thus all
methods now return the state as if the restarting node does not exist.

This avoids various exceptions stemming from accessing the stopped
node(s).
  • Loading branch information
henningandersen authored Jun 17, 2019
1 parent f828c77 commit 4fd7a22
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -55,7 +56,11 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
Expand Down Expand Up @@ -369,14 +374,7 @@ public void testRecoverBrokenIndexMetadata() throws Exception {
// this one is not validated ahead of time and breaks allocation
.put("index.analysis.filter.myCollator.type", "icu_collation")
).build();
internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
}
});
writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta));

// check that the cluster does not keep reallocating shards
assertBusy(() -> {
Expand Down Expand Up @@ -443,14 +441,7 @@ public void testRecoverMissingAnalyzer() throws Exception {
final IndexMetaData metaData = state.getMetaData().index("test");
final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings()
.filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build();
internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
}
});
writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta));

// check that the cluster does not keep reallocating shards
assertBusy(() -> {
Expand Down Expand Up @@ -495,14 +486,7 @@ public void testArchiveBrokenClusterSettings() throws Exception {
final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder()
.put(metaData.persistentSettings()).put("this.is.unknown", true)
.put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build();
internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
}
});
writeBrokenMeta(metaStateService -> metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta));

ensureYellow("test"); // wait for state recovery
state = client().admin().cluster().prepareState().get().getState();
Expand All @@ -519,4 +503,17 @@ public Settings onNodeStopped(String nodeName) throws Exception {
+ MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()));
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
}

private void writeBrokenMeta(CheckedConsumer<MetaStateService, IOException> writer) throws Exception {
Map<String, MetaStateService> metaStateServices = Stream.of(internalCluster().getNodeNames())
.collect(Collectors.toMap(Function.identity(), nodeName -> internalCluster().getInstance(MetaStateService.class, nodeName)));
internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = metaStateServices.get(nodeName);
writer.accept(metaStateService);
return super.onNodeStopped(nodeName);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
Expand Down Expand Up @@ -889,6 +888,7 @@ void startNode() {
Settings closeForRestart(RestartCallback callback) throws Exception {
assert callback != null;
close();
removeNode(this);
Settings callbackSettings = callback.onNodeStopped(name);
assert callbackSettings != null;
Settings.Builder newSettings = Settings.builder();
Expand Down Expand Up @@ -1648,20 +1648,9 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
final Settings newSettings = nodeAndClient.closeForRestart(callback);
removeExclusions(excludedNodeIds);

boolean success = false;
try {
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
nodeAndClient.startNode();
success = true;
} finally {
if (success == false) {
removeNode(nodeAndClient);
}
}

if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(Collections.singletonList(nodeAndClient)));
nodeAndClient.startNode();
publishNode(nodeAndClient);

if (callback.validateClusterForming() || excludedNodeIds.isEmpty() == false) {
// we have to validate cluster size to ensure that the restarted node has rejoined the cluster if it was master-eligible;
Expand Down Expand Up @@ -1728,6 +1717,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
final Settings[] newNodeSettings = new Settings[nextNodeId.get()];
Map<Set<DiscoveryNodeRole>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
final int nodeCount = nodes.size();
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Stopping and resetting node [{}] ", nodeAndClient.name);
Expand All @@ -1741,7 +1731,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
}

assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodes.size();
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodeCount;

// randomize start up order, but making sure that:
// 1) A data folder that was assigned to a data node will stay so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.elasticsearch.test.test;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;

import java.io.IOException;

Expand Down Expand Up @@ -61,4 +64,26 @@ public void testStoppingNodesOneByOne() throws IOException {

ensureGreen();
}

public void testOperationsDuringRestart() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(2);
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ensureGreen();
internalCluster().validateClusterFormed();
assertNotNull(internalCluster().getInstance(NodeClient.class));
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ensureGreen();
internalCluster().validateClusterFormed();
return super.onNodeStopped(nodeName);
}
});
return super.onNodeStopped(nodeName);
}
});
}
}

0 comments on commit 4fd7a22

Please sign in to comment.