Skip to content

Commit

Permalink
Only allow rebalance operations to run if all shard store data is ava…
Browse files Browse the repository at this point in the history
…ilable

This commit prevents running rebalance operations if the store allocator is
still fetching async shard / store data to prevent pre-mature rebalance decisions
which need to be reverted once shard store data is available. This is typically happening
on rolling restarts which can make those restarts extremely painful.

Closes elastic#14387
  • Loading branch information
s1monw committed Nov 10, 2015
1 parent 1c7bf02 commit 479711d
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public RoutingExplanations explanations() {

private boolean debugDecision = false;

private boolean hasPendingAsyncFetch = false;


/**
* Creates a new {@link RoutingAllocation}
*
Expand Down Expand Up @@ -244,4 +247,20 @@ public Decision decision(Decision decision, String deciderLabel, String reason,
return decision;
}
}

/**
* Returns <code>true</code> iff the current allocation run has not processed all of the in-flight or available
* shard or store fetches. Otherwise <code>true</code>
*/
public boolean hasPendingAsyncFetch() {
return hasPendingAsyncFetch;
}

/**
* Sets a flag that signals that current allocation run has not processed all of the in-flight or available shard or store fetches.
* This state is anti-viral and can be reset in on allocation run.
*/
public void setHasPendingAsyncFetch() {
this.hasPendingAsyncFetch = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void applyFailedShards(FailedRerouteAllocation allocation) { /* ONLY FOR

@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
return rebalance(allocation);
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.allocateUnassigned();
}

@Override
Expand Down Expand Up @@ -342,6 +343,15 @@ private static boolean lessThan(float delta, float threshold) {
return delta <= (threshold + 0.001f);
}

/**
* Allocates all possible unassigned shards
* @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code>
*/
final boolean allocateUnassigned() {
return balance(true);
}

/**
* Balances the nodes on the cluster model according to the weight
* function. The configured threshold is the minimum delta between the
Expand All @@ -357,16 +367,24 @@ private static boolean lessThan(float delta, float threshold) {
* changed, otherwise <code>false</code>
*/
public boolean balance() {
return balance(false);
}

private boolean balance(boolean onlyAssign) {
if (this.nodes.isEmpty()) {
/* with no nodes this is pointless */
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Start balancing cluster");
if (onlyAssign) {
logger.trace("Start balancing cluster");
} else {
logger.trace("Start assigning unassigned shards");
}
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned().transactionBegin();
boolean changed = initialize(routingNodes, unassigned);
if (!changed && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
NodeSorter sorter = newNodeSorter();
if (nodes.size() > 1) { /* skip if we only have one node */
for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,19 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {

@Override
public boolean rebalance(RoutingAllocation allocation) {
return allocator.rebalance(allocation);
if (allocation.hasPendingAsyncFetch() == false) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
* since once the fetches come back we might just move all the shards back again.
* Therefore we only do a rebalance if we have fetched all information.
*/
return allocator.rebalance(allocation);
} else {
logger.debug("skipping rebalance due to in-flight shard/store fetches");
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ protected Settings getIndexSettings(String index) {
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards> shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
if (shardState.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
allocation.setHasPendingAsyncFetch();
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue;
Expand Down Expand Up @@ -422,6 +423,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
if (shardStores.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
allocation.setHasPendingAsyncFetch();
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue; // still fetching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.gateway.none.NoneGatewayAllocator;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.anyOf;
Expand Down Expand Up @@ -629,4 +634,93 @@ public void testClusterAllActive3() {

assertThat(routingNodes.node("node3").isEmpty(), equalTo(true));
}

public void testRebalanceWhileShardFetching() {
final AtomicBoolean hasFetches = new AtomicBoolean(true);
AllocationService strategy = createAllocationService(settingsBuilder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE,
ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoneGatewayAllocator() {
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
if (hasFetches.get()) {
allocation.setHasPendingAsyncFetch();
}
return super.allocateUnassigned(allocation);
}
});

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(2).numberOfReplicas(0))
.put(IndexMetaData.builder("test1").settings(settingsBuilder().put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_id", "node1,node2")).numberOfShards(2).numberOfReplicas(0))
.build();

// we use a second index here (test1) that never gets assigned otherwise allocateUnassinged is never called if we don't have unassigned shards.
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.addAsNew(metaData.index("test1"))
.build();

ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();

logger.info("start two nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
}

logger.debug("start all the primary shards for test");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test", INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}

logger.debug("now, start 1 more node, check that rebalancing will not happen since we have shard sync going on");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node2")))
.build();
logger.debug("reroute and check that nothing has changed");
RoutingAllocation.Result reroute = strategy.reroute(clusterState);
assertFalse(reroute.changed());
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(UNASSIGNED));
}
logger.debug("now set hasFetches to true and reroute we should now see exactly one relocating shard");
hasFetches.set(false);
reroute = strategy.reroute(clusterState);
assertTrue(reroute.changed());
routingTable = reroute.routingTable();
int numStarted = 0;
int numRelocating = 0;
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {

assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
if (routingTable.index("test").shard(i).primaryShard().state() == STARTED) {
numStarted++;
} else if (routingTable.index("test").shard(i).primaryShard().state() == RELOCATING) {
numRelocating++;
}
}
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(UNASSIGNED));
}
assertEquals(numStarted, 1);
assertEquals(numRelocating, 1);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
Expand Down Expand Up @@ -130,4 +137,40 @@ public void testFullRollingRestart() throws Exception {
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), 2000l);
}
}

@Slow
public void testNoRebalanceOnRollingRestart() throws Exception {
// see https://github.com/elastic/elasticsearch/issues/14387
internalCluster().startNode(ImmutableSettings.settingsBuilder().put("node.master", true).put("node.data", false).put("gateway.type", "local").build());
internalCluster().startNodesAsync(3, ImmutableSettings.settingsBuilder().put("node.master", false).put("gateway.type", "local").build()).get();

/**
* We start 3 nodes and a dedicated master. Restart on of the data-nodes and ensure that we got no relocations.
* Yet we have 6 shards 0 replica so that means if the restarting node comes back both other nodes are subject
* to relocating to the restarting node since all had 2 shards and now one node has nothing allocated.
* We have a fix for this to wait until we have allocated unallocated shards now so this shouldn't happen.
*/
prepareCreate("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "6").put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0").put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueMinutes(1))).get();

for (int i = 0; i < 100; i++) {
client().prepareIndex("test", "type1", Long.toString(i))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map()).execute().actionGet();
}
ensureGreen();
ClusterState state = client().admin().cluster().prepareState().get().getState();
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
RecoveryState recoveryState = response.recoveryState();
assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode() + "\n" + state.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION);
}
internalCluster().restartRandomDataNode();
ensureGreen();
ClusterState afterState = client().admin().cluster().prepareState().get().getState();

recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
RecoveryState recoveryState = response.recoveryState();
assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode()+ "-- \nbefore: \n" + state.prettyPrint() + "\nafter: \n" + afterState.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
Expand Down Expand Up @@ -57,6 +59,12 @@ public static AllocationService createAllocationService(Settings settings) {
return createAllocationService(settings, getRandom());
}

public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
return new AllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(settings, allocator, new BalancedShardsAllocator(settings)), ClusterInfoService.EMPTY);
}

public static AllocationService createAllocationService(Settings settings, Random random) {
return new AllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS), random),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ public void restartRandomNode(RestartCallback callback) throws Exception {
* Restarts a random data node in the cluster
*/
public void restartRandomDataNode() throws Exception {
restartRandomNode(EMPTY_CALLBACK);
restartRandomDataNode(EMPTY_CALLBACK);
}

/**
Expand Down

0 comments on commit 479711d

Please sign in to comment.