Skip to content

Commit

Permalink
Only allow rebalance operations to run if all shard store data is ava…
Browse files Browse the repository at this point in the history
…ilable

This commit prevents running rebalance operations if the store allocator is
still fetching async shard / store data to prevent pre-mature rebalance decisions
which need to be reverted once shard store data is available. This is typically happening
on rolling restarts which can make those restarts extremely painful.

Closes elastic#14387
  • Loading branch information
s1monw committed Nov 6, 2015
1 parent 97644e3 commit 01ca95a
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,20 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {

@Override
public boolean rebalance(RoutingAllocation allocation) {
return allocator.rebalance(allocation);
final int numberOfInFlightFetch = gatewayAllocator.getNumberOfInFlightFetch();
if (numberOfInFlightFetch == 0) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
* since once the fetches come back we might just move all the shards back again.
* Therefore we only do a rebalance if we have fetched all information.
*/
return allocator.rebalance(allocation);
} else {
logger.debug("skip rebalance [{}] shard store fetch operations are still in-flight", numberOfInFlightFetch);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;

import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -624,4 +625,72 @@ public void testClusterAllActive3() {

assertThat(routingNodes.node("node3").isEmpty(), equalTo(true));
}

public void testRebalanceWhileShardFetching() {
final AtomicInteger numFetch = new AtomicInteger(1);
AllocationService strategy = createAllocationService(settingsBuilder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE,
ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() {
@Override
public int getNumberOfInFlightFetch() {
return numFetch.get();
}
});

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();

ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();

logger.info("start two nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
}

logger.debug("start all the primary shards for test");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test", INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}

logger.debug("now, start 1 more node, check that rebalancing will not happen since we have shard sync going on");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node2")))
.build();

RoutingAllocation.Result reroute = strategy.reroute(clusterState);
assertFalse(reroute.changed());
numFetch.set(0);
reroute = strategy.reroute(clusterState);
assertTrue(reroute.changed());
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
int numStarted = 0;
int numRelocating = 0;
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {

assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
if (routingTable.index("test").shard(i).primaryShard().state() == STARTED) {
numStarted++;
} else if (routingTable.index("test").shard(i).primaryShard().state() == RELOCATING) {
numRelocating++;
}
}
assertEquals(numStarted, 1);
assertEquals(numRelocating, 1);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;

Expand Down Expand Up @@ -79,6 +80,12 @@ public static AllocationService createAllocationService(Settings settings, Clust
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
}

public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
return new AllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE);
}



public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class NoopGatewayAllocator extends GatewayAllocator {

public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();

private NoopGatewayAllocator() {
protected NoopGatewayAllocator() {
super(Settings.EMPTY, null, null);
}

Expand Down

0 comments on commit 01ca95a

Please sign in to comment.